# project 1

## projec info 
 * @Description:
 * 需求:将http.log文件中的ip转换为地址。如将 122.228.96.111 转为温州，并统计各城市的总访问量
 * http.log数据格式：时间戳、IP地址、访问网址、访问数据、浏览器信息
 * http.log数据样例：20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|
 * @Author zj
 * @Time
 * @code python


In [1]:
import findspark
findspark.init()

In [2]:
import pyspark 

from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext,SparkSession
from pyspark.sql.functions import split,row_number
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql.window import *

In [3]:
spark = pyspark.sql.SparkSession.builder \
    .master("local[4]") \
    .appName("Feature Engineering") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.default.parrallelism", "4") \
    .enableHiveSupport() \
    .getOrCreate()

In [35]:
spark.sparkContext.setLogLevel('warn')

In [36]:
sc = spark.sparkContext

In [37]:
city_id_rdd = sc.textFile(r"data/ip.dat").map(lambda x: x.split("|")).map(
        lambda x: (x[2], x[3],x[7], x[13], x[14]))

In [38]:
city_broadcast = sc.broadcast(city_id_rdd.collect())


In [39]:
dest_data = sc.textFile(r"data/http.log").map(
        lambda x: x.split("|")[1])

In [40]:
# 将IP地址转化为32位2进制数字
def ip_transform(ip):
    ips = ip.split(".")  # [223,243,0,0] 32位二进制数
    ip_num = 0
    for i in ips:
        ip_num = int(i) | ip_num << 8
    return ip_num


# 二分法查找ip对应的行的索引
def binary_search(ip_num, broadcast_value):
    start = 0
    end = len(broadcast_value) - 1
    while (start <= end):
        mid = int((start + end) / 2)
        if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]):
            return mid
        if ip_num < int(broadcast_value[mid][0]):
            end = mid
        if ip_num > int(broadcast_value[mid][1]):
            start = mid


In [41]:
def get_pos(x):
        city_broadcast_value = city_broadcast.value

        # 根据单个ip获取对应经纬度信息
        def get_result(ip):
            ip_num = ip_transform(ip)
            index = binary_search(ip_num, city_broadcast_value)
            # ((地区),1)
            return ((city_broadcast_value[index][2]), 1)

        x = map(tuple, [get_result(ip) for ip in x])
        return x



In [42]:
# 类似map,但是map是一条一条传给里面函数
# mapPartitions数据是一部分一部分传给函数的
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x))  # ((地点),1)

result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
print(result_rdd.collect())

[('重庆', 868), ('石家庄', 383), ('昆明', 126), ('北京', 1535), ('西安', 1824)]


In [30]:
def binary_search(ip_num, broadcast_value):
    start = 0
    end = len(broadcast_value) - 1
    while (start <= end):
        mid = int((start + end) / 2)
        if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]):
            return mid
        if ip_num < int(broadcast_value[mid][0]):
            end = mid
        if ip_num > int(broadcast_value[mid][1]):
            start = mid


In [26]:
def ip_transform(ip):
    ips = ip.split(".")  # [223,243,0,0] 32位二进制数
    ip_num = 0
    for i in ips:
        ip_num = int(i) | ip_num << 8
    return ip_num

In [41]:
spark.stop()
sc.stop()

# project 2 日志分析

- project info 

日志格式：IP	命中率(Hit/Miss)	响应时间	请求时间	请求方法	请求URL	请求协议	状态码	响应大小referer 用户代理

日志文件位置：data/cdn.txt

- 数据格式:


`100.79.121.48 HIT 33 [15/Feb/2017:00:00:46 +0800] "GET http://cdn.v.abc.com.cn/videojs/video.js HTTP/1.1" 200 174055 "http://www.abc.com.cn/" "Mozilla/4.0+(compatible;+MSIE+6.0;+Windows+NT+5.1;+Trident/4.0;)"`
   - 术语解释：

      *  PV(page view)，即页面浏览量；衡量网站或单一网页的指标

      * uv(unique visitor)，指访问某个站点或点击某条新闻的不同IP地址的人数

   - 任务:

    - 2.1、计算独立IP数

    - 2.2、统计每个视频独立IP数（视频的标志：在日志文件的某些可以找到 *.mp4，代表一个视频文件）

    - 2.3、统计一天中每个小时的流量



In [11]:
# 导入相关需要使用的包
import findspark
findspark.init()
import pyspark 

from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext,SparkSession
from pyspark.sql.functions import split,row_number
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql.window import *


In [3]:
"""
创建spark session APPname 是project2
"""
spark = pyspark.sql.SparkSession.builder \
    .master("local[*]") \
    .appName("Project2") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.default.parrallelism", "4") \
    .config("spark.sql.execution.arrow.enabled", "ture")\
    .enableHiveSupport() \
    .getOrCreate()

In [4]:
spark.sparkContext.setLogLevel('warn')
sc = spark.sparkContext

In [5]:
sc.setLogLevel("warn")

In [6]:
inputs = sc.textFile(r"data/cdn.txt").map(lambda x: x.split(" ")).map(lambda x:(x[0],1))

In [7]:
# 独立IP访问量前10位
inputs.reduceByKey(lambda a, b: a + b).sortBy(keyfunc=(lambda x:x[1]),ascending=False).take(10)

[('114.55.227.102', 9348),
 ('220.191.255.197', 2640),
 ('115.236.173.94', 2476),
 ('183.129.221.102', 2187),
 ('112.53.73.66', 1794),
 ('115.236.173.95', 1650),
 ('220.191.254.129', 1278),
 ('218.88.25.200', 751),
 ('183.129.221.104', 569),
 ('115.236.173.93', 529)]

In [9]:
# 重新读取文件
input2 = sc.textFile(r"data/cdn.txt").map(lambda x: x.split(" ")).map(lambda x:(x[6],x[0]))

In [13]:
import re
restr= re.compile(""".+/(\S+\.mp4)""")

In [28]:
input2.filter(lambda x: restr.match(x[0])).take(5)

[('http://cdn.v.abc.com.cn/141011.mp4', '115.192.186.231'),
 ('http://cdn.v.abc.com.cn/140987.mp4', '125.105.41.123'),
 ('http://cdn.v.abc.com.cn/140987.mp4', '211.138.116.41'),
 ('http://cdn.v.abc.com.cn/140853.mp4', '60.181.47.255'),
 ('http://cdn.v.abc.com.cn/140987.mp4', '115.192.186.231')]

In [27]:
# 每个视频独立IP数
# 先将实际的视频截取出来，然后再进行groupByKey统计
input2.filter(lambda x: restr.match(x[0]))\
.map(lambda x: (x[0].split("//")[1].split("/")[1],x[1]))\
.groupByKey()\
.mapValues(lambda x:len(set(x)))\
.sortBy(keyfunc=(lambda x:x[1]),ascending=False)\
.take(10)

[('141081.mp4', 2393),
 ('140995.mp4', 2050),
 ('141027.mp4', 1784),
 ('141090.mp4', 1702),
 ('141032.mp4', 1528),
 ('89973.mp4', 1522),
 ('141080.mp4', 1425),
 ('141035.mp4', 1321),
 ('141082.mp4', 1272),
 ('140938.mp4', 814)]

In [41]:
# 重新读取文件
input3 = sc.textFile(r"data/cdn.txt").map(lambda x: x.split(" ")).map(lambda x:(x[3].split(":")[1],int(x[9])))

In [48]:
input3.reduceByKey(lambda a, b: a + b).mapValues(lambda x: str(x/1024/1024/1024) + "G").sortBy(keyfunc=(lambda x:x[0])).collect()

[('00', '15.453603968955576G'),
 ('01', '3.905158909969032G'),
 ('02', '5.195773116312921G'),
 ('03', '3.6369203999638557G'),
 ('04', '3.2884229784831405G'),
 ('05', '5.021793280728161G'),
 ('06', '11.671565165743232G'),
 ('07', '22.00996015779674G'),
 ('08', '43.230181308463216G'),
 ('09', '52.83912879694253G'),
 ('10', '61.57396282441914G'),
 ('11', '45.854314849711955G'),
 ('12', '47.00132285710424G'),
 ('13', '51.02019470091909G'),
 ('14', '55.84231084771454G'),
 ('15', '45.46238579135388G'),
 ('16', '45.94618751946837G'),
 ('17', '44.81707710213959G'),
 ('18', '45.33248774614185G'),
 ('19', '51.932280966080725G'),
 ('20', '55.40267270896584G'),
 ('21', '53.74120971187949G'),
 ('22', '42.061507997103035G'),
 ('23', '25.0245084669441G')]

# project 3

**3、Spark面试题**

假设点击日志文件(click.log)中每行记录格式如下：

~~~
INFO 2019-09-01 00:29:53 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:30:31 requestURI:/click?app=2&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:31:03 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=32
INFO 2019-09-01 00:31:51 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=33
~~~

另有曝光日志(imp.log)格式如下：

~~~
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=34
~~~

3.1、用Spark-Core实现统计每个adid的曝光数与点击数，将结果输出到hdfs文件；

输出文件结构为adid、曝光数、点击数。注意：数据不能有丢失（存在某些adid有imp，没有clk；或有clk没有imp）

3.2、你的代码有多少个shuffle，是否能减少？

（提示：仅有1次shuffle是最优的）



In [76]:
import urllib.parse as urlpar

In [59]:
# 讀入數據
clickLog = sc.textFile("data/click.log")
impLog = sc.textFile("data/imp.log")

In [85]:
clkRDD = clickLog.map(lambda line: line.split(" ")).map(lambda x: urlpar.urlparse(x[3])).map(lambda x: (urlpar.parse_qs(x.query)['adid'][1],(1,0) ))
impRDD= impLog.map(lambda line: line.split(" ")).map(lambda x: urlpar.urlparse(x[3])).map(lambda x: (urlpar.parse_qs(x.query)['adid'][1],(0,1) ))

In [90]:
clkRDD.union(impRDD).reduceByKey(lambda x , y : (x[0] + y[0] , x[1] + y[1] ) ).collect()

[('31', (2, 2)), ('32', (1, 0)), ('33', (1, 0)), ('34', (0, 1))]

In [92]:
impLog.map(lambda line: line.split(" ")).map(lambda x: urlpar.urlparse(x[3])).map(lambda x: (urlpar.parse_qs(x.query)['adid'][1],(0,1) )).take(2)

[('31', (0, 1)), ('31', (0, 1))]

In [91]:
clickLog.map(lambda line: line.split(" ")).map(lambda x: urlpar.urlparse(x[3])).map(lambda x: (urlpar.parse_qs(x.query)['adid'][1],(1,0) )).take(2)

[('31', (1, 0)), ('31', (1, 0))]

# project 6

6、A表有三个字段：ID、startdate、enddate，有3条数据：

~~~
1 2019-03-04 2020-02-03
2 2020-04-05 2020-08-04
3 2019-10-09 2020-06-11
~~~

写SQL（需要SQL和DSL）将以上数据变化为：

~~~
2019-03-04	2019-10-09
2019-10-09	2020-02-03
2020-02-03	2020-04-05
2020-04-05	2020-06-11
2020-06-11	2020-08-04
2020-08-04	2020-08-04
~~~


In [177]:
from pyspark.sql import Row
import pyspark.sql.functions as F

In [108]:
data = ["1 2019-03-04 2020-02-03", "2 2020-04-05 2020-08-04", "3 2019-10-09 2020-06-11"]

In [137]:
ds = sc.parallelize(data).map(lambda x : " ".join(x.split(" ")[1:]) ).flatMap( lambda strs : re.split("\\s+",strs)).sortBy(keyfunc=(lambda x:x[0]))

In [161]:
df = ds.map(lambda x: (x, )).toDF()

In [163]:
df = df.withColumnRenamed('_1','value')

In [182]:
# DSL
df.withColumn("new", F.max("value").over(Window.orderBy("value").rowsBetween(0,1))).show()

+----------+----------+
|     value|       new|
+----------+----------+
|2019-03-04|2019-10-09|
|2019-10-09|2020-02-03|
|2020-02-03|2020-04-05|
|2020-04-05|2020-06-11|
|2020-06-11|2020-08-04|
|2020-08-04|2020-08-04|
+----------+----------+



In [166]:
df.createOrReplaceTempView("t1")

In [171]:
# SQL
spark.sql("""
select value,max(value) over (order by value rows between current row and 1 following) maxvalues
 from t1
""").show()

+----------+----------+
|     value| maxvalues|
+----------+----------+
|2019-03-04|2019-10-09|
|2019-10-09|2020-02-03|
|2020-02-03|2020-04-05|
|2020-04-05|2020-06-11|
|2020-06-11|2020-08-04|
|2020-08-04|2020-08-04|
+----------+----------+



# KNN/KMEAN实现

**4、使用鸢尾花数据集实现KNN算法**



**5、使用鸢尾花数据集实现KMeans算法**

备注：

- 4、5的详细说明请参考视频说明
- 文件位置：data/Iris.csv
- 请按视频说明在原始数据集的基础上组织文件


## 模型实现

In [94]:
from sklearn import datasets
from sklearn.neighbors import KNeighborsClassifier
from sklearn.cluster import KMeans
from pyspark.sql.functions import udf, col

In [98]:
# 导入iris数据集
iris = datasets.load_iris()


In [100]:
neigh = KNeighborsClassifier(n_neighbors = 3)
neigh.fit(iris.data,iris.target)
print("预测结果："+str(neigh.predict([[5,3,5,2]]))) 

预测结果：[2]


In [99]:
kmeans = KMeans(n_clusters = 3,random_state = 0).fit(iris.data)
print("kemeans:"+str(kmeans.labels_)) #打印聚类结果
print("target:"+str(iris.target)) 


kemeans:[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 2 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 0 2 2 2 2 0 2 2 2 2
 2 2 0 0 2 2 2 2 0 2 0 2 0 2 2 0 0 2 2 2 2 2 0 2 2 2 2 0 2 2 2 0 2 2 2 0 2
 2 0]
target:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2]
