## B) Analyse HTTP Log for User Sessions

### Use Case

Client X would like to find out about the campaign performance for a certain time frame from HTTP log.

A campaign may result in http requests including:

* GET request to `/campaigns/{{id}}`
* POST request to `/rewards/{{id}}` (get a reward issued to the user)

A session window for a user is defined as: within which there's activity for a user (http request for the user) and gap between each activity is less than 5 mins.

Need to provide a report that has following fields:

* user_id
* session_start (timestamp)
* session_end (timestamp)
* campaigns: list of campaigns the user viewd
* rewards_issued: list of rewards the user got issued
* reward_driven_by_campaign_view: true if user got a reward issued after viewing a campaign that contains the reward within the session

### Tasks

1. Create an avro schema for the HTTP log and convert the http log data provided to avro format
2. Load the avro data into a Redshift table
3. Load the campaign and reward mapping csv into a redshift table
4. Do the processing to create the result

#### Data Provided

HTTP log is provided as text file with each line as one entry, it's saved in `data/http_log.txt`. Fields in the log entry are delimited by space with following format:

```
timestamp http_method http_path user_id
```

The following list describes the fields of an access log entry.

* timestamp: iso8601 timestamp (exmaple: `2019-06-01T18:00:00.829+08:00`)
* http_method
* http_path: The http request path
* user_id: User identifier

Mapping of campaign and reward is provided as a csv file in `data/campaign_reward_mapping.csv`.

Campaign and reward is many to many relationship.

### Deliverable

The avro schema needs to be created manually as avsc file. All other tasks need to be encapsulated in a framework project and shared via a private github repo.

Within framework project, you are free to use any python library.

Even though the provided dataset is small, writing code that can accommodate very large dataset is preferred. Using frameworks like apache spark or beam or any other big data parralel processing is recommended.


In [None]:
import pandas as pd
import numpy as np
import fastavro
import io
from datetime import datetime, timedelta


In [83]:
# Load CSV into DataFrame
df_http = pd.read_csv('http_log.csv', header=None, names=['timestamp', 'x'])


In [84]:
df_http

Unnamed: 0,timestamp,x
0,2019-08-27T00:07:05+00:00,POST /rewards/1529 2000020076516
1,2019-08-27T00:08:06+00:00,POST /rewards/1529 2000020076516
2,2019-08-27T00:05:01+00:00,POST /rewards/1433 2000042538627
3,2019-08-27T00:05:15+00:00,POST /rewards/1035 2000039323012
4,2019-08-27T00:06:07+00:00,POST /rewards/5844 2000037004150
...,...,...
15995,2019-08-26T16:32:01+00:00,GET /campaigns/11 2000027181038
15996,2019-08-26T16:31:48+00:00,GET /campaigns/3602 1969485
15997,2019-08-26T16:31:43+00:00,GET /campaigns/3602 2000045480280
15998,2019-08-26T16:32:53+00:00,GET /campaigns/3602 5724768


In [85]:

# Split the 'x' column into separate columns
df_http[['http_method', 'http_path', 'user_id']] = df_http['x'].str.split(' ', 2, expand=True)

# Drop the original 'x' column
df_http.drop(columns=['x'], inplace=True)

# Convert the 'timestamp' column to datetime format
df_http['timestamp'] = pd.to_datetime(df_http['timestamp'])

# Display the new DataFrame
print(df_http)

                      timestamp http_method        http_path        user_id
0     2019-08-27 00:07:05+00:00        POST    /rewards/1529  2000020076516
1     2019-08-27 00:08:06+00:00        POST    /rewards/1529  2000020076516
2     2019-08-27 00:05:01+00:00        POST    /rewards/1433  2000042538627
3     2019-08-27 00:05:15+00:00        POST    /rewards/1035  2000039323012
4     2019-08-27 00:06:07+00:00        POST    /rewards/5844  2000037004150
...                         ...         ...              ...            ...
15995 2019-08-26 16:32:01+00:00         GET    /campaigns/11  2000027181038
15996 2019-08-26 16:31:48+00:00         GET  /campaigns/3602        1969485
15997 2019-08-26 16:31:43+00:00         GET  /campaigns/3602  2000045480280
15998 2019-08-26 16:32:53+00:00         GET  /campaigns/3602        5724768
15999 2019-08-26 16:33:55+00:00         GET  /campaigns/3602  2000069344719

[16000 rows x 4 columns]


  df_http[['http_method', 'http_path', 'user_id']] = df_http['x'].str.split(' ', 2, expand=True)


In [86]:
#extract type
df_http['type'] = df_http['http_path'].str.extract(r'/(.*?)\/\d+')


In [87]:
# extract numbers
df_http['number'] = df_http['http_path'].str.extract(r'/(?P<type>.*?)\/(?P<number>\d+)')['number']


In [88]:
df_http

Unnamed: 0,timestamp,http_method,http_path,user_id,type,number
0,2019-08-27 00:07:05+00:00,POST,/rewards/1529,2000020076516,rewards,1529
1,2019-08-27 00:08:06+00:00,POST,/rewards/1529,2000020076516,rewards,1529
2,2019-08-27 00:05:01+00:00,POST,/rewards/1433,2000042538627,rewards,1433
3,2019-08-27 00:05:15+00:00,POST,/rewards/1035,2000039323012,rewards,1035
4,2019-08-27 00:06:07+00:00,POST,/rewards/5844,2000037004150,rewards,5844
...,...,...,...,...,...,...
15995,2019-08-26 16:32:01+00:00,GET,/campaigns/11,2000027181038,campaigns,11
15996,2019-08-26 16:31:48+00:00,GET,/campaigns/3602,1969485,campaigns,3602
15997,2019-08-26 16:31:43+00:00,GET,/campaigns/3602,2000045480280,campaigns,3602
15998,2019-08-26 16:32:53+00:00,GET,/campaigns/3602,5724768,campaigns,3602


## Create Avro Schema for df_http

In [35]:
# pip install fastavro

Collecting fastavro
  Downloading fastavro-1.9.4-cp310-cp310-win_amd64.whl (497 kB)
     ------------------------------------ 497.2/497.2 kB 285.9 kB/s eta 0:00:00
Installing collected packages: fastavro
Successfully installed fastavro-1.9.4
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.2.2 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [89]:

from fastavro import writer, parse_schema
import io

# Convert timestamp to UTC
df_http['timestamp'] = pd.to_datetime(df_http['timestamp']).dt.tz_convert(None).astype(str)

# Define Avro schema
avro_schema = {
    "type": "record",
    "name": "HttpEventData",
    "fields": [
        {"name": "timestamp", "type": "string"},
        {"name": "http_method", "type": "string"},
        {"name": "http_path", "type": "string"},
        {"name": "user_id", "type": "string"},
        {"name": "type", "type": "string"},
        {"name": "number", "type": "string"}
    ]
}

# Parse Avro schema
parsed_schema = parse_schema(avro_schema)

# Convert DataFrame to Avro records
avro_records = df_http.to_dict(orient='records')

# Serialize Avro records
avro_bytes_io = io.BytesIO()
writer(avro_bytes_io, parsed_schema, avro_records)

# Get Avro bytes
avro_bytes = avro_bytes_io.getvalue()

# Write Avro bytes to a file or use them as needed
# For example, write to a file
with open('df_http.avro', 'wb') as f:
    f.write(avro_bytes)

In [12]:
# Read the CSV file into a DataFrame
df_c = pd.read_csv('campaign_reward_mapping.csv')

# Display the DataFrame
print(df_c)

      campaign_id  reward_id
0               2       1051
1              14       1449
2              14        566
3              40       1464
4             140       1369
...           ...        ...
3038         3569       8105
3039         3602       7864
3040         3602       1429
3041         3602       1380
3042           11       8381

[3043 rows x 2 columns]


In [26]:

# Step 1: Identify session windows
session_threshold = pd.Timedelta(minutes=5)
df_http = df_http.sort_values(by=['user_id', 'timestamp'])
df_http['time_diff'] = df_http.groupby('user_id')['timestamp'].diff()

df_http['session_id'] = (df_http['time_diff'] > session_threshold) | df_http['time_diff'].isnull()
df_http['session_id'] = df_http.groupby('user_id')['session_id'].cumsum()

# Step 2: Group by session and aggregate campaigns viewed and rewards issued
df_sessions = df_http.groupby(['user_id', 'session_id']).agg({
    'timestamp': ['min', 'max'],
    'http_path': lambda x: list(x),
    'type': lambda x: list(x),
    'number': lambda x: list(x)
}).reset_index()

df_sessions.columns = ['user_id', 'session_id', 'session_start', 'session_end', 'http_paths', 'types', 'numbers']

# Step 3: Determine rewards driven by campaign views
def check_reward_driven(campaigns, rewards):
    for reward in rewards:
        if f'campaigns/{reward}' in campaigns:
            return True
    return False

df_sessions['reward_driven_by_campaign_view'] = df_sessions.apply(lambda row: check_reward_driven(row['http_paths'], row['numbers']), axis=1)

# Final report
report = df_sessions[['user_id', 'session_start', 'session_end', 'http_paths', 'numbers', 'reward_driven_by_campaign_view']]
print(report)

      user_id             session_start               session_end  \
0     1009125 2019-08-27 03:20:49+00:00 2019-08-27 03:20:49+00:00   
1     1014506 2019-08-26 17:25:19+00:00 2019-08-26 17:26:54+00:00   
2     1018910 2019-08-26 19:10:43+00:00 2019-08-26 19:10:43+00:00   
3     1021245 2019-08-27 03:09:59+00:00 2019-08-27 03:09:59+00:00   
4     1055135 2019-08-27 01:00:25+00:00 2019-08-27 01:00:33+00:00   
...       ...                       ...                       ...   
7372   887444 2019-08-26 16:52:36+00:00 2019-08-26 16:52:36+00:00   
7373   898813 2019-08-27 00:57:38+00:00 2019-08-27 00:57:38+00:00   
7374   930183 2019-08-27 01:29:14+00:00 2019-08-27 01:34:00+00:00   
7375   949268 2019-08-26 22:30:42+00:00 2019-08-26 22:31:40+00:00   
7376   951261 2019-08-27 02:34:57+00:00 2019-08-27 02:38:06+00:00   

                                             http_paths  \
0                                     [/campaigns/1127]   
1     [/campaigns/11, /campaigns/11, /campaigns/11, .

In [43]:
report.columns

Index(['user_id', 'session_start', 'session_end', 'http_paths', 'numbers',
       'reward_driven_by_campaign_view'],
      dtype='object')

In [44]:
report = report.rename(columns={'http_paths': 'campaigns'})
report = report.rename(columns={'rewards_issued': 'campaign_id'})

In [46]:
df_c

Unnamed: 0,campaign_id,reward_id
0,2,1051
1,14,1449
2,14,566
3,40,1464
4,140,1369
...,...,...
3038,3569,8105
3039,3602,7864
3040,3602,1429
3041,3602,1380


In [48]:
report

Unnamed: 0,user_id,session_start,session_end,campaigns,campaign_id,reward_driven_by_campaign_view
0,1009125,2019-08-27 03:20:49+00:00,2019-08-27 03:20:49+00:00,[/campaigns/1127],[1127],False
1,1014506,2019-08-26 17:25:19+00:00,2019-08-26 17:26:54+00:00,"[/campaigns/11, /campaigns/11, /campaigns/11, ...","[11, 11, 11, 7993]",False
2,1018910,2019-08-26 19:10:43+00:00,2019-08-26 19:10:43+00:00,[/campaigns/1788],[1788],False
3,1021245,2019-08-27 03:09:59+00:00,2019-08-27 03:09:59+00:00,[/campaigns/1128],[1128],False
4,1055135,2019-08-27 01:00:25+00:00,2019-08-27 01:00:33+00:00,"[/campaigns/3602, /campaigns/3602]","[3602, 3602]",False
...,...,...,...,...,...,...
7372,887444,2019-08-26 16:52:36+00:00,2019-08-26 16:52:36+00:00,[/campaigns/1128],[1128],False
7373,898813,2019-08-27 00:57:38+00:00,2019-08-27 00:57:38+00:00,[/campaigns/1128],[1128],False
7374,930183,2019-08-27 01:29:14+00:00,2019-08-27 01:34:00+00:00,"[/campaigns/3569, /campaigns/3569, /campaigns/...","[3569, 3569, 1128, 3602, 3602, 11, 11, 11]",False
7375,949268,2019-08-26 22:30:42+00:00,2019-08-26 22:31:40+00:00,"[/campaigns/1128, /campaigns/3602, /campaigns/...","[1128, 3602, 1788, 31]",False


In [57]:
report.columns

Index(['user_id', 'session_start', 'session_end', 'campaigns', 'campaign_id',
       'reward_driven_by_campaign_view'],
      dtype='object')

In [66]:
import pandas as pd

# Explode the 'campaign_id' column in df_report
df_report_exploded = report.explode('campaign_id')

# Merge df_report_exploded with df_c on 'campaign_id' to include reward_id
merged_df = pd.merge(df_report_exploded, df_c, on='campaign_id', how='left')

# Grouping by user_id and session_start to aggregate campaigns and rewards
grouped_df = merged_df.groupby(['user_id', 'session_start', 'session_end']).agg({
    'campaign_id': lambda x: list(set(x)),  # Convert to set to remove duplicates
    'reward_id': lambda x: list(x),
    'reward_driven_by_campaign_view': 'first'
}).reset_index()

# Checking if reward issued corresponds to a campaign viewed within the session
grouped_df['reward_driven_by_campaign_view'] = grouped_df.apply(lambda row: any(reward_id in row['reward_id'] for reward_id in row['reward_id']), axis=1)

# Renaming columns for clarity
grouped_df = grouped_df.rename(columns={'campaign_id': 'campaigns', 'reward_id': 'rewards_issued'})

# Determine if reward was driven by a campaign view within the session
def is_reward_driven(row):
    for reward_id in row['rewards_issued']:
        for campaign_id in row['campaigns']:
            if reward_id in df_c[df_c['campaign_id'] == campaign_id]['reward_id'].values:
                return True
    return False

grouped_df['reward_driven_by_campaign_view'] = grouped_df.apply(is_reward_driven, axis=1)

# Reordering columns
grouped_df = grouped_df[['user_id', 'session_start', 'session_end', 'campaigns', 'rewards_issued', 'reward_driven_by_campaign_view']]

print(grouped_df)


      user_id             session_start               session_end  \
0     1009125 2019-08-27 03:20:49+00:00 2019-08-27 03:20:49+00:00   
1     1014506 2019-08-26 17:25:19+00:00 2019-08-26 17:26:54+00:00   
2     1018910 2019-08-26 19:10:43+00:00 2019-08-26 19:10:43+00:00   
3     1021245 2019-08-27 03:09:59+00:00 2019-08-27 03:09:59+00:00   
4     1055135 2019-08-27 01:00:25+00:00 2019-08-27 01:00:33+00:00   
...       ...                       ...                       ...   
7372   887444 2019-08-26 16:52:36+00:00 2019-08-26 16:52:36+00:00   
7373   898813 2019-08-27 00:57:38+00:00 2019-08-27 00:57:38+00:00   
7374   930183 2019-08-27 01:29:14+00:00 2019-08-27 01:34:00+00:00   
7375   949268 2019-08-26 22:30:42+00:00 2019-08-26 22:31:40+00:00   
7376   951261 2019-08-27 02:34:57+00:00 2019-08-27 02:38:06+00:00   

                   campaigns  \
0                     [1127]   
1                 [7993, 11]   
2                     [1788]   
3                     [1128]   
4          

In [67]:
grouped_df

Unnamed: 0,user_id,session_start,session_end,campaigns,rewards_issued,reward_driven_by_campaign_view
0,1009125,2019-08-27 03:20:49+00:00,2019-08-27 03:20:49+00:00,[1127],"[3930.0, 4222.0, 4597.0, 3927.0, 4948.0, 4597....",True
1,1014506,2019-08-26 17:25:19+00:00,2019-08-26 17:26:54+00:00,"[7993, 11]","[7522.0, 5656.0, 3136.0, 4516.0, 3828.0, 5655....",True
2,1018910,2019-08-26 19:10:43+00:00,2019-08-26 19:10:43+00:00,[1788],"[7137.0, 8032.0, 6897.0, 4489.0, 5283.0, 8022....",True
3,1021245,2019-08-27 03:09:59+00:00,2019-08-27 03:09:59+00:00,[1128],"[7140.0, 5331.0, 4658.0, 7329.0, 5844.0, 8040....",True
4,1055135,2019-08-27 01:00:25+00:00,2019-08-27 01:00:33+00:00,[3602],"[1022.0, 5644.0, 3234.0, 5014.0, 926.0, 8027.0...",True
...,...,...,...,...,...,...
7372,887444,2019-08-26 16:52:36+00:00,2019-08-26 16:52:36+00:00,[1128],"[7140.0, 5331.0, 4658.0, 7329.0, 5844.0, 8040....",True
7373,898813,2019-08-27 00:57:38+00:00,2019-08-27 00:57:38+00:00,[1128],"[7140.0, 5331.0, 4658.0, 7329.0, 5844.0, 8040....",True
7374,930183,2019-08-27 01:29:14+00:00,2019-08-27 01:34:00+00:00,"[1128, 3569, 3602, 11]","[8103.0, 8104.0, 8098.0, 8122.0, 8120.0, 8101....",True
7375,949268,2019-08-26 22:30:42+00:00,2019-08-26 22:31:40+00:00,"[1128, 3602, 1788, 31]","[7140.0, 5331.0, 4658.0, 7329.0, 5844.0, 8040....",True


In [78]:
# Filter grouped_df to show rows where reward_driven_by_campaign_view is True
grouped_df_true = grouped_df[grouped_df['reward_driven_by_campaign_view'] == True]

# Display the filtered DataFrame
print(grouped_df_true)

      user_id        session_start          session_end  \
0     1009125  2019-08-27 03:20:49  2019-08-27 03:20:49   
1     1014506  2019-08-26 17:25:19  2019-08-26 17:26:54   
2     1018910  2019-08-26 19:10:43  2019-08-26 19:10:43   
3     1021245  2019-08-27 03:09:59  2019-08-27 03:09:59   
4     1055135  2019-08-27 01:00:25  2019-08-27 01:00:33   
...       ...                  ...                  ...   
7372   887444  2019-08-26 16:52:36  2019-08-26 16:52:36   
7373   898813  2019-08-27 00:57:38  2019-08-27 00:57:38   
7374   930183  2019-08-27 01:29:14  2019-08-27 01:34:00   
7375   949268  2019-08-26 22:30:42  2019-08-26 22:31:40   
7376   951261  2019-08-27 02:34:57  2019-08-27 02:38:06   

                   campaigns  \
0                     [1127]   
1                 [7993, 11]   
2                     [1788]   
3                     [1128]   
4                     [3602]   
...                      ...   
7372                  [1128]   
7373                  [1128]   
737

In [79]:
# Filter grouped_df to show rows where reward_driven_by_campaign_view is False
grouped_df_false = grouped_df[grouped_df['reward_driven_by_campaign_view'] == False]

# Display the filtered DataFrame
print(grouped_df_false)

            user_id        session_start          session_end     campaigns  \
39          1601063  2019-08-26 23:45:09  2019-08-26 23:45:34  [6873, 4653]   
73          1983147  2019-08-26 22:22:31  2019-08-26 22:22:31        [4655]   
76    2000020076516  2019-08-26 16:03:06  2019-08-26 18:39:36        [1529]   
77    2000020076516  2019-08-26 18:45:07  2019-08-26 20:14:35        [1529]   
78    2000020076516  2019-08-26 20:21:06  2019-08-26 23:54:35        [1529]   
...             ...                  ...                  ...           ...   
7205        5466976  2019-08-27 02:58:24  2019-08-27 02:58:24        [7993]   
7225        5526587  2019-08-27 03:24:39  2019-08-27 03:24:39        [7996]   
7259        5665717  2019-08-27 00:42:42  2019-08-27 00:42:42        [7996]   
7322        5833502  2019-08-26 17:02:04  2019-08-26 17:02:04        [7993]   
7370         739817  2019-08-27 02:06:41  2019-08-27 02:06:41          [60]   

                                         rewards_is

In [80]:
grouped_df_false

Unnamed: 0,user_id,session_start,session_end,campaigns,rewards_issued,reward_driven_by_campaign_view
39,1601063,2019-08-26 23:45:09,2019-08-26 23:45:34,"[6873, 4653]","[nan, nan]",False
73,1983147,2019-08-26 22:22:31,2019-08-26 22:22:31,[4655],[nan],False
76,2000020076516,2019-08-26 16:03:06,2019-08-26 18:39:36,[1529],"[nan, nan, nan, nan, nan, nan, nan, nan, nan, ...",False
77,2000020076516,2019-08-26 18:45:07,2019-08-26 20:14:35,[1529],"[nan, nan, nan, nan, nan, nan, nan, nan, nan, ...",False
78,2000020076516,2019-08-26 20:21:06,2019-08-26 23:54:35,[1529],"[nan, nan, nan, nan, nan, nan, nan, nan, nan, ...",False
...,...,...,...,...,...,...
7205,5466976,2019-08-27 02:58:24,2019-08-27 02:58:24,[7993],[nan],False
7225,5526587,2019-08-27 03:24:39,2019-08-27 03:24:39,[7996],[nan],False
7259,5665717,2019-08-27 00:42:42,2019-08-27 00:42:42,[7996],[nan],False
7322,5833502,2019-08-26 17:02:04,2019-08-26 17:02:04,[7993],[nan],False


In [69]:
grouped_df['reward_driven_by_campaign_view'].unique()

array([ True, False])

## Save file into csv and avro

In [70]:

# Save report to CSV file
grouped_df.to_csv('campaign_performance_report_fix.csv', sep='|', index=False)


In [77]:


# Convert 'session_start' and 'session_end' columns to datetime format
grouped_df['session_start'] = pd.to_datetime(grouped_df['session_start'])
grouped_df['session_end'] = pd.to_datetime(grouped_df['session_end'])

# Convert 'user_id' column to int64
grouped_df['user_id'] = grouped_df['user_id'].astype(np.int64)

# Convert 'session_start' and 'session_end' columns to string
grouped_df['session_start'] = grouped_df['session_start'].dt.strftime('%Y-%m-%d %H:%M:%S')
grouped_df['session_end'] = grouped_df['session_end'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Define Avro schema
schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "UserData",
    "fields": [
        {"name": "user_id", "type": "int"},
        {"name": "session_start", "type": {"type": "string", "logicalType": "timestamp-micros"}},
        {"name": "session_end", "type": {"type": "string", "logicalType": "timestamp-micros"}},
        {"name": "campaigns", "type": {"type": "array", "items": "int"}},
        {"name": "rewards_issued", "type": {"type": "array", "items": "double"}},
        {"name": "reward_driven_by_campaign_view", "type": "boolean"}
    ]
}

# Convert DataFrame to Avro records
records = grouped_df.to_dict(orient='records')

# Write Avro records to a buffer
buffer = io.BytesIO()
fastavro.writer(buffer, schema, records)

# Save Avro records to a file
with open("campaign_performance_report_fix_avro.avro", "wb") as f:
    f.write(buffer.getvalue())
