## Task 0 Data

**Data:**
* avazu ctr prediction data is used 
    * https://www.kaggle.com/c/avazu-ctr-prediction/data
    * train.gz file is used
* train dataset - 10 days data


**Goal:**  
* processing and aggregate data   
   
    
**Outcome:**
* parquet files - impressions, clicks

----

In [8]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import unix_timestamp, from_unixtime

In [2]:
# spark session
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

----

### Load data

In [3]:
dfs = spark.read.csv("./data/train.gz", header = True)

In [4]:
dfs.printSchema()

root
 |-- id: string (nullable = true)
 |-- click: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- banner_pos: string (nullable = true)
 |-- site_id: string (nullable = true)
 |-- site_domain: string (nullable = true)
 |-- site_category: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_domain: string (nullable = true)
 |-- app_category: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_ip: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- device_conn_type: string (nullable = true)
 |-- C14: string (nullable = true)
 |-- C15: string (nullable = true)
 |-- C16: string (nullable = true)
 |-- C17: string (nullable = true)
 |-- C18: string (nullable = true)
 |-- C19: string (nullable = true)
 |-- C20: string (nullable = true)
 |-- C21: string (nullable = true)



In [5]:
dfs.head()

Row(id='1000009418151094273', click='0', hour='14102100', C1='1005', banner_pos='0', site_id='1fbe01fe', site_domain='f3845767', site_category='28905ebd', app_id='ecad2386', app_domain='7801e8d9', app_category='07d7df22', device_id='a99f214a', device_ip='ddd2926e', device_model='44956a24', device_type='1', device_conn_type='2', C14='15706', C15='320', C16='50', C17='1722', C18='0', C19='35', C20='-1', C21='79')

### No of clicks

In [7]:
dft = dfs.withColumn("click", dfs["click"].cast(IntegerType()))
dft = dft.groupby("hour").sum("click").withColumnRenamed("sum(click)", "no_of_clicks")
dft.show()

+--------+------------+
|    hour|no_of_clicks|
+--------+------------+
|14102202|       19847|
|14102100|       20792|
|14102206|       46227|
|14102218|       34708|
|14102915|       34018|
|14102715|       20941|
|14102120|       18195|
|14102509|       31437|
|14102601|       15168|
|14102712|       28938|
|14102321|       17159|
|14103010|       34587|
|14102812|       44660|
|14103016|       39159|
|14102110|       35694|
|14102204|       34377|
|14102512|       38454|
|14102822|       17865|
|14102209|       62410|
|14102722|       15457|
+--------+------------+
only showing top 20 rows



In [9]:
# convert 'hour' to datetime
dfx = dft.withColumn('hour',from_unixtime(unix_timestamp(dft.hour, 'yymmddH'))).withColumnRenamed("hour", "date_hour")
dfx.show()

+-------------------+------------+
|          date_hour|no_of_clicks|
+-------------------+------------+
|2014-01-22 02:10:00|       19847|
|2014-01-21 00:10:00|       20792|
|2014-01-22 06:10:00|       46227|
|2014-01-22 18:10:00|       34708|
|2014-01-29 15:10:00|       34018|
|2014-01-27 15:10:00|       20941|
|2014-01-21 20:10:00|       18195|
|2014-01-25 09:10:00|       31437|
|2014-01-26 01:10:00|       15168|
|2014-01-27 12:10:00|       28938|
|2014-01-23 21:10:00|       17159|
|2014-01-30 10:10:00|       34587|
|2014-01-28 12:10:00|       44660|
|2014-01-30 16:10:00|       39159|
|2014-01-21 10:10:00|       35694|
|2014-01-22 04:10:00|       34377|
|2014-01-25 12:10:00|       38454|
|2014-01-28 22:10:00|       17865|
|2014-01-22 09:10:00|       62410|
|2014-01-27 22:10:00|       15457|
+-------------------+------------+
only showing top 20 rows



In [10]:
# order by date
dfx = dfx.orderBy("date_hour")
dfx.show()

+-------------------+------------+
|          date_hour|no_of_clicks|
+-------------------+------------+
|2014-01-21 00:10:00|       20792|
|2014-01-21 01:10:00|       23873|
|2014-01-21 02:10:00|       31265|
|2014-01-21 03:10:00|       32830|
|2014-01-21 04:10:00|       40026|
|2014-01-21 05:10:00|       40709|
|2014-01-21 06:10:00|       38657|
|2014-01-21 07:10:00|       35531|
|2014-01-21 08:10:00|       35220|
|2014-01-21 09:10:00|       37802|
|2014-01-21 10:10:00|       35694|
|2014-01-21 11:10:00|       35476|
|2014-01-21 12:10:00|       28775|
|2014-01-21 13:10:00|       35425|
|2014-01-21 14:10:00|       35168|
|2014-01-21 15:10:00|       34080|
|2014-01-21 16:10:00|       33469|
|2014-01-21 17:10:00|       32557|
|2014-01-21 18:10:00|       27470|
|2014-01-21 19:10:00|       20201|
+-------------------+------------+
only showing top 20 rows



### Impression

In [11]:
dfi = dfs.groupby("hour").count()
dfi.show()

+--------+------+
|    hour| count|
+--------+------+
|14102202|102844|
|14102100|119006|
|14102206|288819|
|14102218|205756|
|14102915|202404|
|14102715|104242|
|14102120|112017|
|14102509|180060|
|14102601| 80863|
|14102712|166229|
|14102321|116964|
|14103010|215813|
|14102812|272124|
|14103016|211916|
|14102110|200028|
|14102204|200948|
|14102512|205914|
|14102822|124195|
|14102209|447783|
|14102722| 94982|
+--------+------+
only showing top 20 rows



In [12]:
# rename columns
dfi = dfi.toDF('date_hour', 'impressions')
dfi.show()

+---------+-----------+
|date_hour|impressions|
+---------+-----------+
| 14102202|     102844|
| 14102100|     119006|
| 14102206|     288819|
| 14102218|     205756|
| 14102915|     202404|
| 14102715|     104242|
| 14102120|     112017|
| 14102509|     180060|
| 14102601|      80863|
| 14102712|     166229|
| 14102321|     116964|
| 14103010|     215813|
| 14102812|     272124|
| 14103016|     211916|
| 14102110|     200028|
| 14102204|     200948|
| 14102512|     205914|
| 14102822|     124195|
| 14102209|     447783|
| 14102722|      94982|
+---------+-----------+
only showing top 20 rows



In [13]:
# datetime format and order by date
dfi = dfi.withColumn('date_hour',from_unixtime(unix_timestamp(dfi.date_hour, 'yymmddH'))).orderBy("date_hour")
dfi.show()

+-------------------+-----------+
|          date_hour|impressions|
+-------------------+-----------+
|2014-01-21 00:10:00|     119006|
|2014-01-21 01:10:00|     137442|
|2014-01-21 02:10:00|     207471|
|2014-01-21 03:10:00|     193355|
|2014-01-21 04:10:00|     264711|
|2014-01-21 05:10:00|     273500|
|2014-01-21 06:10:00|     239720|
|2014-01-21 07:10:00|     209311|
|2014-01-21 08:10:00|     207244|
|2014-01-21 09:10:00|     230917|
|2014-01-21 10:10:00|     200028|
|2014-01-21 11:10:00|     175666|
|2014-01-21 12:10:00|     143620|
|2014-01-21 13:10:00|     190481|
|2014-01-21 14:10:00|     174531|
|2014-01-21 15:10:00|     176156|
|2014-01-21 16:10:00|     171869|
|2014-01-21 17:10:00|     171933|
|2014-01-21 18:10:00|     152365|
|2014-01-21 19:10:00|     119775|
+-------------------+-----------+
only showing top 20 rows



----

### Outcome

In [14]:
# clicks dataframe write into parquet file
dfx.write.parquet("clicks.parquet")

In [15]:
# impression dataframe write into parquet file
dfi.write.parquet("impressions.parquet")

---