In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, lag, isnull, when
from pyspark.sql.window import Window

In [2]:
%%time

spark = SparkSession.builder \
                    .appName("ctr") \
                    .config("spark.cores.local", 4) \
                    .config("spark.shell.driver-memory", "8G") \
                    .getOrCreate()

CPU times: user 28 ms, sys: 0 ns, total: 28 ms
Wall time: 2.9 s


In [3]:
dirname = '/home4/yannick4/tmp'

%time df = spark.read.parquet(dirname + "/train.parquet/", )

df

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.76 s


DataFrame[id: decimal(20,0), click: int, hour: int, C1: int, banner_pos: int, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_id: string, device_ip: string, device_model: string, device_type: int, device_conn_type: int, C14: int, C15: int, C16: int, C17: int, C18: int, C19: int, C20: int, C21: int]

In [4]:
%time df.head(3)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.04 s


[Row(id=Decimal('13438938916556439466'), click=1, hour=14102916, C1=1005, banner_pos=1, site_id='5b4d2eda', site_domain='16a36ef3', site_category='f028772b', app_id='ecad2386', app_domain='7801e8d9', app_category='07d7df22', device_id='a99f214a', device_ip='fd74ef0a', device_model='8a4875bd', device_type=1, device_conn_type=0, C14=21882, C15=320, C16=50, C17=2526, C18=0, C19=167, C20=-1, C21=221),
 Row(id=Decimal('13439050860308297569'), click=0, hour=14102916, C1=1005, banner_pos=0, site_id='85f751fd', site_domain='c4e18dd6', site_category='50e219e0', app_id='51cedd4e', app_domain='aefc06bd', app_category='0f2161f8', device_id='a99f214a', device_ip='bec514f4', device_model='542422a7', device_type=1, device_conn_type=0, C14=21611, C15=320, C16=50, C17=2480, C18=3, C19=297, C20=100111, C21=61),
 Row(id=Decimal('1343908897585470171'), click=0, hour=14102916, C1=1005, banner_pos=0, site_id='85f751fd', site_domain='c4e18dd6', site_category='50e219e0', app_id='de97da65', app_domain='33da2e7

In [5]:
%time df.groupBy("click").count().show()

+-----+--------+
|click|   count|
+-----+--------+
|    1| 6865066|
|    0|33563901|
+-----+--------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.82 s


In [6]:
%time df.groupBy("banner_pos").agg({'id': 'count', 'click': 'avg'}).sort('banner_pos').show()

+----------+---------+-------------------+
|banner_pos|count(id)|         avg(click)|
+----------+---------+-------------------+
|         0| 29109590|0.16427235835338114|
|         1| 11247282|0.18361449459522755|
|         2|    13001|0.11922159833858934|
|         3|     2035| 0.1828009828009828|
|         4|     7704| 0.1853582554517134|
|         5|     5778|0.12149532710280374|
|         7|    43577| 0.3201000527801363|
+----------+---------+-------------------+

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 5.05 s


In [7]:
%time df.createOrReplaceTempView("train")

%time spark.sql("SELECT COUNT(*) FROM train").show()

%time spark.sql("SELECT click, COUNT(*) AS cnt from train GROUP BY click ORDER BY click").show()

%time spark.sql("\
SELECT banner_pos, COUNT(*) AS cnt, AVG(click) AS click \
FROM   train \
GROUP  BY banner_pos \
ORDER  BY banner_pos").show()

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 157 ms
+--------+
|count(1)|
+--------+
|40428967|
+--------+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 436 ms
+-----+--------+
|click|     cnt|
+-----+--------+
|    0|33563901|
|    1| 6865066|
+-----+--------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.02 s
+----------+--------+-------------------+
|banner_pos|     cnt|              click|
+----------+--------+-------------------+
|         0|29109590|0.16427235835338114|
|         1|11247282|0.18361449459522755|
|         2|   13001|0.11922159833858934|
|         3|    2035| 0.1828009828009828|
|         4|    7704| 0.1853582554517134|
|         5|    5778|0.12149532710280374|
|         7|   43577| 0.3201000527801363|
+----------+--------+-------------------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.28 s


In [8]:
%time df.groupBy("device_type").agg({'id': 'count', 'click': 'avg'}).sort(desc("count(id)")).show()

+-----------+---------+-------------------+
|device_type|count(id)|         avg(click)|
+-----------+---------+-------------------+
|          1| 37304667|0.16917577631774597|
|          0|  2220812|0.21073148019733323|
|          4|   774272|0.09544449495784427|
|          5|   129185|0.09384216433796494|
|          2|       31|0.06451612903225806|
+-----------+---------+-------------------+

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 3.65 s


In [9]:
%time device_id_type = df.groupBy("device_id", "device_type").agg({'id': 'count'})
%time device_id = device_id_type.groupBy("device_id").agg({'*': 'count', 'count(id)': 'sum'})

%time device_id.head(3)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 9.65 ms
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 7.32 ms
CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 9.81 s


[Row(device_id='8b9dfa40', sum(count(id))=2, count(1)=1),
 Row(device_id='bf886e47', sum(count(id))=1, count(1)=1),
 Row(device_id='86eb71fe', sum(count(id))=3, count(1)=1)]

In [10]:
%time device_id.filter(device_id["count(1)"] >=2) \
               .groupBy("count(1)") \
               .agg({'device_id': 'count'}) \
               .sort(desc('count(1)')).show()

+--------+----------------+
|count(1)|count(device_id)|
+--------+----------------+
|       4|               1|
|       3|              13|
|       2|            1465|
+--------+----------------+

CPU times: user 0 ns, sys: 8 ms, total: 8 ms
Wall time: 9 s


In [26]:
train3 = df. \
    withColumn("int_day", df.hour.substr(5, 2)). \
    withColumn("int_hour", df.hour.substr(7, 2))

train3.select("device_id", "int_day", "int_hour").show()

+---------+-------+--------+
|device_id|int_day|int_hour|
+---------+-------+--------+
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| 058cb1c3|     29|      16|
| caf850ad|     29|      16|
| 20c5f715|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
| 74d0d05a|     29|      16|
| 5b0ced18|     29|      16|
| a99f214a|     29|      16|
| a99f214a|     29|      16|
+---------+-------+--------+
only showing top 20 rows



In [28]:
train4 = train3 \
    .sort('device_id', 'device_ip', 'int_day', 'int_hour') \
    .select('device_id', 'device_ip', 'int_day', 'int_hour')

device_plus_dt = train3 \
    .withColumn('dt_hour',
                train4.int_hour.cast("int") - 
                    lag(train4.int_hour).over(Window.partitionBy("device_id", "device_ip", "int_day") \
                                                    .orderBy("int_hour")) \
                                        .cast('int')
               )

In [30]:
%%time

device_plus_dt \
    .filter(isnull(device_plus_dt.dt_hour)) \
    .count()

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 31.1 s


11411016

In [66]:
spark.sql(" \
select *, cast(substr(hour, 5, 2) as integer) as int_day, cast(substr(hour, 7, 2) as integer) as int_hour from train") \
    .createOrReplaceTempView("train3")
    
spark.sql(" \
select *, \
(int_hour - (lag(int_hour) over (partition by device_id, device_ip, int_day order by int_hour))) as dt_hour \
from train3").createOrReplaceTempView("device_plus_dt")

%time spark.sql("\
select count(*) from device_plus_dt where dt_hour is null").show()

+--------+
|count(1)|
+--------+
|11411016|
+--------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 24.4 s


In [48]:
%time spark.sql(" \
SELECT * \
FROM train \
WHERE device_ip = '8a014cbb' ").createOrReplaceTempView("filter_8a")

%time spark.sql(" \
SELECT substring(hour, 1, 6) AS day, \
       count(*) AS clicks_per_day \
FROM filter_8a \
 GROUP BY day \
 ORDER BY day").createOrReplaceTempView("count_8a")

%time spark.sql("select * from count_8a limit 6").show()

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 12.6 ms
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 19.8 ms
+------+--------------+
|   day|clicks_per_day|
+------+--------------+
|141021|          7257|
|141022|          8115|
|141023|          7163|
|141024|          6762|
|141025|          6698|
|141026|          7339|
+------+--------------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 2.2 s


In [69]:
%time count_device_ip = spark.sql("\
select device_ip, count(*) as cnt from train group by device_ip order by cnt desc").collect()

count_device_ip[:20]

CPU times: user 30.1 s, sys: 556 ms, total: 30.7 s
Wall time: 1min 9s


[Row(device_ip='6b9769f2', cnt=208701),
 Row(device_ip='431b3174', cnt=135322),
 Row(device_ip='2f323f36', cnt=88499),
 Row(device_ip='af9205f9', cnt=87844),
 Row(device_ip='930ec31d', cnt=86996),
 Row(device_ip='af62faf4', cnt=85802),
 Row(device_ip='009a7861', cnt=85382),
 Row(device_ip='285aa37d', cnt=85313),
 Row(device_ip='6394f6f6', cnt=83184),
 Row(device_ip='d90a7774', cnt=82980),
 Row(device_ip='c6563308', cnt=71290),
 Row(device_ip='57cd4006', cnt=70449),
 Row(device_ip='75bb1b58', cnt=70262),
 Row(device_ip='1cf29716', cnt=70028),
 Row(device_ip='ddd2926e', cnt=69816),
 Row(device_ip='0489ce3f', cnt=69756),
 Row(device_ip='a8536f3a', cnt=69594),
 Row(device_ip='488a9a3e', cnt=69550),
 Row(device_ip='ceffea69', cnt=68731),
 Row(device_ip='8a014cbb', cnt=68506)]

In [70]:
spark.sql("\
select device_ip, count(*) as cnt from train group by device_ip order by cnt desc") \
.createOrReplaceTempView("count_device_ip")

%time spark.sql("select count(*) from count_device_ip where cnt > 10").show()

+--------+
|count(1)|
+--------+
|  462718|
+--------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 10.6 s
