In [10]:
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import pandas as pd
import boto3
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import psycopg2

In [11]:
spark = SparkSession.builder.getOrCreate()

21/08/15 16:44:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [12]:
def convert(df):
    for i in df.index:
        yield {column: str(df[column][i]) for column in df.columns}
        

### Get Source File

In [13]:
httplog = 'https://media.githubusercontent.com/media/PerxTech/data-interview/master/data/http_log.txt'
reward_mapping = "https://media.githubusercontent.com/media/PerxTech/data-interview/master/data/campaign_reward_mapping.csv"
df_httplog = pd.read_csv(httplog, header=None, delim_whitespace=True)
df_httplog.columns=['timestamp','http_method','http_path','user_id']
df_httplog.head(10)


Unnamed: 0,timestamp,http_method,http_path,user_id
0,2019-08-27T00:07:05+00:00,POST,/rewards/1529,2000020076516
1,2019-08-27T00:08:06+00:00,POST,/rewards/1529,2000020076516
2,2019-08-27T00:05:01+00:00,POST,/rewards/1433,2000042538627
3,2019-08-27T00:05:15+00:00,POST,/rewards/1035,2000039323012
4,2019-08-27T00:06:07+00:00,POST,/rewards/5844,2000037004150
5,2019-08-27T00:05:51+00:00,POST,/rewards/971,2000042538627
6,2019-08-27T00:07:05+00:00,POST,/rewards/7993,511655052
7,2019-08-27T00:00:05+00:00,POST,/rewards/1529,2000020076516
8,2019-08-27T00:01:35+00:00,POST,/rewards/1529,2000020076516
9,2019-08-27T00:03:05+00:00,POST,/rewards/1529,2000020076516


In [14]:

df_rewardmap = pd.read_csv(reward_mapping).drop_duplicates()
df_rewardmap.head(10)


Unnamed: 0,campaign_id,reward_id
0,2,1051
1,14,1449
2,14,566
3,40,1464
4,140,1369
5,137,1587
6,147,1430
7,5,250
8,204,1784
9,175,1733


### Write data to Avro

In [15]:
avroschema = ['httplog.avsc','campaign_reward_mapping.avsc']
avrofile = ['httplog.avro','campaign_reward_mapping.avro']
df = [df_httplog,df_rewardmap]

def avrowriter(s, avrofile,df):
    data=convert(df)
    schema = avro.schema.parse(open(s, "rb").read())
    writer = DataFileWriter(open(avrofile, "wb"), DatumWriter(), schema)
    for i in data:
        writer.append(i)
    writer.close()

for i in range(2):
    avrowriter(avroschema[i],avrofile[i],df[i])
    

### Upload to S3 Bucket

In [23]:
bucket = os.environ["AWS_S3_SCRATCH_SPACE"]
iamrole = os.environ["STS_ROLE_ARN"]

def upload_s3(file,bucket):
    client = boto3.client('s3',\
                          aws_access_key_id = os.environ["AWS_ACCESS_KEY_ID"],\
                          aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
                         )
    return client.upload_file(file,bucket,file)

In [9]:
for i in range(2):
    upload_s3(avrofile[i],bucket)
    print(f'''{avrofile[i]} has been successfully uploaded to S3''')


### Load to Redshift

In [52]:
def avro_to_redshift(tbl,file):
    conn = psycopg2.connect(
        host=os.environ["AWS_REDSHIFT_HOST"],
        user=os.environ["AWS_REDSHIFT_USER"],
        port=os.environ["AWS_REDSHIFT_PORT"],
        password=os.environ["AWS_REDSHIFT_PASSWORD"],
        dbname=os.environ["DATABASE"]
        )
    cur = conn.cursor()
    cur.execute(f'''truncate table {tbl};''')
    print(f"{tbl} Successfully Truncated")
    cur.execute(f'''copy {tbl} from 's3://{bucket}/{file}' iam_role '{iamrole}' format as avro 'auto' dateformat 'auto' timeformat 'auto';''')
    conn.commit()
    conn.close()
    print(f"{tbl} Successfully Loaded")
    
for i in range(2):
    table = avrofile[i][:-5]
    file = avrofile[i]
    avro_to_redshift(table,file)

    
    

httplog Successfully Truncated
httplog Successfully Loaded
campaign_reward_mapping Successfully Truncated
campaign_reward_mapping Successfully Loaded


################################################

### Convert PandasDF to SparkDF

In [32]:
schema = StructType() \
      .add("campaign_id",StringType(),True) \
      .add("reward_id",StringType(),True) \
      

sparkDF=spark.createDataFrame(df_httplog)
rewardDF=spark.createDataFrame(df_rewardmap,schema)

rewardDF.show(10)
sparkDF.show(10)


                                                                                

+-----------+---------+
|campaign_id|reward_id|
+-----------+---------+
|          2|     1051|
|         14|     1449|
|         14|      566|
|         40|     1464|
|        140|     1369|
|        137|     1587|
|        147|     1430|
|          5|      250|
|        204|     1784|
|        175|     1733|
+-----------+---------+
only showing top 10 rows

+--------------------+-----------+-------------+-------------+
|           timestamp|http_method|    http_path|      user_id|
+--------------------+-----------+-------------+-------------+
|2019-08-27T00:07:...|       POST|/rewards/1529|2000020076516|
|2019-08-27T00:08:...|       POST|/rewards/1529|2000020076516|
|2019-08-27T00:05:...|       POST|/rewards/1433|2000042538627|
|2019-08-27T00:05:...|       POST|/rewards/1035|2000039323012|
|2019-08-27T00:06:...|       POST|/rewards/5844|2000037004150|
|2019-08-27T00:05:...|       POST| /rewards/971|2000042538627|
|2019-08-27T00:07:...|       POST|/rewards/7993|    511655052|
|2019-08

## Separate campaignID and rewardID

In [33]:
sparkDF = sparkDF.withColumn('campaignId', when( col('http_method') == 'GET' , substring('http_path',12,len('http_path'))).otherwise(''))\
.withColumn('rewardId', when( col('http_method') == 'POST' , substring('http_path',10,len('http_path'))).otherwise(''))

sparkDF.show(10)


+--------------------+-----------+-------------+-------------+----------+--------+
|           timestamp|http_method|    http_path|      user_id|campaignId|rewardId|
+--------------------+-----------+-------------+-------------+----------+--------+
|2019-08-27T00:07:...|       POST|/rewards/1529|2000020076516|          |    1529|
|2019-08-27T00:08:...|       POST|/rewards/1529|2000020076516|          |    1529|
|2019-08-27T00:05:...|       POST|/rewards/1433|2000042538627|          |    1433|
|2019-08-27T00:05:...|       POST|/rewards/1035|2000039323012|          |    1035|
|2019-08-27T00:06:...|       POST|/rewards/5844|2000037004150|          |    5844|
|2019-08-27T00:05:...|       POST| /rewards/971|2000042538627|          |     971|
|2019-08-27T00:07:...|       POST|/rewards/7993|    511655052|          |    7993|
|2019-08-27T00:00:...|       POST|/rewards/1529|2000020076516|          |    1529|
|2019-08-27T00:01:...|       POST|/rewards/1529|2000020076516|          |    1529|
|201

In [34]:
campaign=sparkDF.join(rewardDF,sparkDF['campaignId']==rewardDF['campaign_id'],how="left")
campaign.drop('campaign_id').show(10)


                                                                                

+--------------------+-----------+---------------+-------------+----------+--------+---------+
|           timestamp|http_method|      http_path|      user_id|campaignId|rewardId|reward_id|
+--------------------+-----------+---------------+-------------+----------+--------+---------+
|2019-08-27T01:29:...|        GET| /campaigns/467|      4520293|       467|        |     1249|
|2019-08-27T01:29:...|        GET| /campaigns/467|      4520293|       467|        |     4653|
|2019-08-27T01:29:...|        GET| /campaigns/467|      4520293|       467|        |     4655|
|2019-08-27T01:29:...|        GET| /campaigns/467|      4520293|       467|        |     4654|
|2019-08-27T01:29:...|        GET| /campaigns/467|      4520293|       467|        |     2104|
|2019-08-27T01:29:...|        GET| /campaigns/467|      4520293|       467|        |     2103|
|2019-08-27T03:28:...|        GET|/campaigns/2249|2000063511548|      2249|        |     5255|
|2019-08-27T03:28:...|        GET|/campaigns/2249|

In [35]:
last_event = campaign.withColumn('eventtime',to_timestamp(col('timestamp')))\
.withColumn("last_event", lag('eventtime').over(Window.partitionBy('user_id').orderBy('eventtime')))
last_event.select('user_id','http_method','eventtime','last_event').show(100)




+-------+-----------+-------------------+-------------------+
|user_id|http_method|          eventtime|         last_event|
+-------+-----------+-------------------+-------------------+
|2088850|        GET|2019-08-27 11:09:36|               null|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|
|2088850

                                                                                

In [36]:
DiffMinutes = last_event.withColumn('DiffMinutes', (unix_timestamp('eventtime') - unix_timestamp('last_event'))/60)
DiffMinutes.select('user_id','http_method','eventtime','last_event','DiffMinutes').show(100)




+-------+-----------+-------------------+-------------------+-----------+
|user_id|http_method|          eventtime|         last_event|DiffMinutes|
+-------+-----------+-------------------+-------------------+-----------+
|2088850|        GET|2019-08-27 11:09:36|               null|       null|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|
|2088850|        GET|2019-08-27 11:09:

                                                                                

In [37]:
new_session = DiffMinutes.withColumn('is_new_session', when( col('DiffMinutes') > 5 , 1).otherwise(0))
new_session.select('user_id','http_method','eventtime','last_event','DiffMinutes','is_new_session').show(100)


                                                                                

+-------+-----------+-------------------+-------------------+-----------+--------------+
|user_id|http_method|          eventtime|         last_event|DiffMinutes|is_new_session|
+-------+-----------+-------------------+-------------------+-----------+--------------+
|2088850|        GET|2019-08-27 11:09:36|               null|       null|             0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|             0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|             0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|             0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|             0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|             0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|             0|
|2088850|        GET|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|             0|
|2088850|        GET|

In [38]:
user_session_id = new_session.withColumn("user_session_id", sum('is_new_session').over(Window.partitionBy('user_id').orderBy('eventtime')))
user_session_id.show(100)



+--------------------+-----------+---------------+-------+----------+--------+-----------+---------+-------------------+-------------------+-----------+--------------+---------------+
|           timestamp|http_method|      http_path|user_id|campaignId|rewardId|campaign_id|reward_id|          eventtime|         last_event|DiffMinutes|is_new_session|user_session_id|
+--------------------+-----------+---------------+-------+----------+--------+-----------+---------+-------------------+-------------------+-----------+--------------+---------------+
|2019-08-27T03:09:...|        GET|/campaigns/1127|2088850|      1127|        |       1127|     3930|2019-08-27 11:09:36|               null|       null|             0|              0|
|2019-08-27T03:09:...|        GET|/campaigns/1127|2088850|      1127|        |       1127|     4222|2019-08-27 11:09:36|2019-08-27 11:09:36|        0.0|             0|              0|
|2019-08-27T03:09:...|        GET|/campaigns/1127|2088850|      1127|        |  

                                                                                

In [39]:
httplog = user_session_id.groupBy('user_id','user_session_id')\
.agg(min('eventtime').alias('session_start')\
     ,max('eventtime').alias('session_end')\
     ,collect_set('campaignId').alias('campaignIds')\
     ,collect_set('rewardId').alias('rewardIds')\
     ,collect_set('reward_id').alias('c_rewards'))




In [40]:
reward_driven = httplog.withColumn('campaigns',array_remove(col("campaignIds"),''))\
.withColumn('rewards_issued',array_remove(col("rewardIds"),''))\
.withColumn('reward_driven',array_intersect(col("rewards_issued"),col("c_rewards")))


In [41]:
httplog_final = reward_driven.withColumn('reward_driven_by_campaign_view', when( size(col('reward_driven')) == 0 , False)\
                                         .otherwise(True))

In [42]:
httplog_final.select('user_id','session_start','session_end','campaigns','rewards_issued','reward_driven_by_campaign_view').show(100)

                                                                                

+-------------+-------------------+-------------------+--------------------+--------------+------------------------------+
|      user_id|      session_start|        session_end|           campaigns|rewards_issued|reward_driven_by_campaign_view|
+-------------+-------------------+-------------------+--------------------+--------------+------------------------------+
|      2088850|2019-08-27 11:09:36|2019-08-27 11:09:36|              [1127]|            []|                         false|
|      3520005|2019-08-27 00:01:52|2019-08-27 00:02:17|              [2480]|            []|                         false|
|      5598367|2019-08-27 10:33:58|2019-08-27 10:33:58|              [1128]|            []|                         false|
|    502310983|2019-08-27 08:32:21|2019-08-27 08:33:02|              [1128]|            []|                         false|
|    502434808|2019-08-27 09:45:40|2019-08-27 09:45:40|              [1127]|            []|                         false|
|    503594558|2

In [43]:
campaign_session_window = httplog_final.select('user_id','session_start','session_end','campaigns','rewards_issued','reward_driven_by_campaign_view')
campaign_session_window.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- session_start: timestamp (nullable = true)
 |-- session_end: timestamp (nullable = true)
 |-- campaigns: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- rewards_issued: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- reward_driven_by_campaign_view: boolean (nullable = false)



### Show campaign driven data

In [44]:
_reward_driven = httplog_final.where(col('reward_driven_by_campaign_view')==True)

In [45]:
_reward_driven.select('user_id','campaigns','rewards_issued','c_rewards').show(10)

                                                                                

+-------------+--------------------+--------------+--------------------+
|      user_id|           campaigns|rewards_issued|           c_rewards|
+-------------+--------------------+--------------+--------------------+
|2000058401581|              [1128]|          [60]|[7143, 5513, 4949...|
|2000072388039|[3569, 32, 11, 42...|  [8248, 3210]|[1380, 3806, 3150...|
|    500464221|              [1128]|          [60]|[7143, 5513, 4949...|
|2000067271440|              [1128]|          [60]|[7143, 5513, 4949...|
|      4751302|              [1127]|        [8039]|[8256, 5847, 3927...|
|2000062111464|              [1128]|        [7149]|[7143, 5513, 4949...|
|2000072285185|                [31]|  [1200, 7986]|[4519, 5812, 1138...|
|2000067173658|                [11]|        [8249]|[6697, 7141, 2459...|
|      3606322|              [1128]|        [7149]|[7143, 5513, 4949...|
|2000066874437|          [31, 1128]|          [60]|[5847, 4519, 4223...|
+-------------+--------------------+--------------+

### SparkDF to PandasDF

In [46]:
pdCampaign_session_window = campaign_session_window.toPandas()

                                                                                

In [47]:
pdCampaign_session_window.head(10)

Unnamed: 0,user_id,session_start,session_end,campaigns,rewards_issued,reward_driven_by_campaign_view
0,2088850,2019-08-27 11:09:36,2019-08-27 11:09:36,[1127],[],False
1,3520005,2019-08-27 00:01:52,2019-08-27 00:02:17,[2480],[],False
2,5598367,2019-08-27 10:33:58,2019-08-27 10:33:58,[1128],[],False
3,502310983,2019-08-27 08:32:21,2019-08-27 08:33:02,[1128],[],False
4,502434808,2019-08-27 09:45:40,2019-08-27 09:45:40,[1127],[],False
5,503594558,2019-08-27 10:30:48,2019-08-27 10:30:48,[1128],[],False
6,504136952,2019-08-27 09:05:18,2019-08-27 09:05:18,[1128],[],False
7,506202909,2019-08-27 09:55:03,2019-08-27 09:55:03,[3635],[],False
8,510565993,2019-08-27 01:16:23,2019-08-27 01:16:23,[1128],[],False
9,511512712,2019-08-27 01:38:43,2019-08-27 01:38:43,[1128],[],False


### Write to Avro File

In [50]:
avroschema2 = 'campaign_session_window.avsc'
avrofile2 = 'campaign_session_window.avro'

avrowriter(avroschema2,avrofile2,pdCampaign_session_window)

### Upload to S3 Bucket

In [66]:
upload_s3(avrofile2,bucket)

### Load to Redshift

In [51]:
table2 = avrofile2[:-5]
avro_to_redshift(table2,avrofile2)

campaign_session_window Successfully Truncated
campaign_session_window Successfully Loaded
