# Introduction

## Problem Statement

## Requirements

- [ ] Extract data directly from the URL
- [ ] Given a CSV file, process data
- [ ] Load data to another folder
- [ ] Send data to PowerBI
- [ ] Write a single pipeline which extracts data from URL and upload metrics on the dashboard

### Data Processing

- [X] Handle null values
- [X] Handle correcting data types
- [ ] Do basic calculations
- [ ] Make predictions about the future

# Importing Libraries and Packages

In [1]:
print(9)

9


In [178]:
import requests
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,isnan,when,count
import pyspark.sql.functions as F

import pandas as pd

In [3]:
# Downloading Data

In [214]:
# Cannot download data directly without Microsoft Office Authentication.
# Hence, downloading data manually. However, Microsoft provides an API through
# which we can download data (potentially creating data pipeline).

SHAREPOINT_URL = "https://kaios-my.sharepoint.com/personal/data_bi_kaiostech_com/_layouts/15/onedrive.aspx?originalPath=aHR0cHM6Ly9rYWlvcy1teS5zaGFyZXBvaW50LmNvbS86ZjovZy9wZXJzb25hbC9kYXRhX2JpX2thaW9zdGVjaF9jb20vRXFhb01maGJRR0JFaFZUT3Zwby10MzRCbXBwZmhvdWh4VEsxTFhHWEhnZks5dz9ydGltZT1tQnhDVXhDWjJVZw&id=%2Fpersonal%2Fdata%5Fbi%5Fkaiostech%5Fcom%2FDocuments%2FTest%5Fassignment%5F%2D%5FData%5FAnalyst%5F%2D%5FActive%5FUsers"

URL_ADS_STREAM = "https://kaios-my.sharepoint.com/:x:/r/personal/data_bi_kaiostech_com/_layouts/15/Doc.aspx?sourcedoc=%7BEFE4241F-18C1-4CA3-BA15-CE29A0C36685%7D&file=ads_stream.csv&action=default&mobileredirect=true"

URL_STORE_STREAM = "https://kaios-my.sharepoint.com/:x:/r/personal/data_bi_kaiostech_com/_layouts/15/Doc.aspx?sourcedoc=%7B5F399790-6C00-4F01-B978-A28843F8F413%7D&file=store_stream.csv&action=default&mobileredirect=true"

In [215]:
def download_file(url, filepath):
    req = requests.get(url)
    url_content = req.content
    with open(filepath, 'wb') as f:
        f.write(url_content)

ADS_STREAM_FILEPATH = "./data/ads_stream.csv"
STORE_STREAM_FILEPATH = "./data/store_stream.csv"
# download_file(URL_ADS_STREAM, ADS_STREAM_FILEPATH)
# download_file(URL_STORE_STREAM, STORE_STREAM_FILEPATH)

In [6]:
# Load Data

In [216]:
# Create pyspark session
spark = SparkSession.builder.appName('KPI Dashboard').getOrCreate()
spark

In [231]:
df = spark.read.csv(ADS_STREAM_FILEPATH, header=True, inferSchema=True)

In [218]:
df.show()

# Observations:
# 1. For event_type of logs, ad_id is null.
# 2.

+--------------------+--------------+--------------------+----------+-------------+-------+
|           device_id|   server_time|               ad_id|event_type|      country|  brand|
+--------------------+--------------+--------------------+----------+-------------+-------+
|c8d146ad02124a3e8...|  2/19/18 2:05|                null|      logs|      Nigeria|Brand E|
|7a8ff5a545534e109...| 6/26/18 10:06|c67ef34fb59e47df9...|      load|      Nigeria|Brand D|
|4dc957af76404c54b...|12/21/18 21:12|                null|      logs|        India|Brand D|
|bb97056dfb3e449ab...| 1/31/18 10:58|                null|      logs|        India|Brand D|
|995f6a50742644f7b...| 5/29/18 21:02|                null|      logs|      Nigeria|Brand A|
|c4c91e7021da40bea...|12/23/18 15:02|e5ae45c4c0ae4ab09...|     close|        India|Brand E|
|8e425095de124f709...| 6/30/18 15:31|e5ae45c4c0ae4ab09...|      load|        India|Brand C|
|cc3bfc7f3ca24f04b...|10/15/18 19:09|                null|      logs|      Niger

In [219]:
df.printSchema()

# Note: We need to convert server_time to timestamp data format.

root
 |-- device_id: string (nullable = true)
 |-- server_time: string (nullable = true)
 |-- ad_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- brand: string (nullable = true)



- [X] Check number of samples and columns
- [X] Check for null values
- [X] Check data type
- [X] Check for duplicates
- [X] Convert server_time to timestamp
- [ ] Build a PySpark pipeline

In [232]:
print(f"Total number of rows: {df.count()}")
df.limit(5).toPandas()

Total number of rows: 387134


Unnamed: 0,device_id,server_time,ad_id,event_type,country,brand
0,c8d146ad02124a3e8d90ffe80f7fe0fb,2/19/18 2:05,,logs,Nigeria,Brand E
1,7a8ff5a545534e1090a89b6778252ed4,6/26/18 10:06,c67ef34fb59e47df9fd7cbbbaebc382f,load,Nigeria,Brand D
2,4dc957af76404c54bc591719e27ad648,12/21/18 21:12,,logs,India,Brand D
3,bb97056dfb3e449ab089246e56193c28,1/31/18 10:58,,logs,India,Brand D
4,995f6a50742644f7b4c7c4ee51134801,5/29/18 21:02,,logs,Nigeria,Brand A


In [233]:
# Check how many NULL values in each column
def check_for_nan(df, subset=None):
    if subset:
        df = df.select(subset)
    for column in df.columns:
        df = df.withColumn(column, df['`{}`'.format(column)].cast('string'))
    nan_df = df.select([count(when(col(c).contains('None') | \
                                col(c).contains('NULL') |
                                col(c).contains('NaN') |
                                (col(c) == '' ) | \
                                col(c).isNull() | \
                                isnan(c), c
                               )).alias(c)
                        for c in df.columns])
    return nan_df

nan_df = check_for_nan(df)
nan_df.show()

# There are NaN values only in ad_id column and they must be there
# because of event_type of logs. This is because event_type logs doesn't
# have any ad_id associated with it because it is written by system
# automatically.

df_without_logs = df.filter(df.event_type != "logs")
nan_df_without_logs = check_for_nan(df_without_logs)
nan_df.show()
# Yes, our intuition was correct. ad_id is NaN only when the event_type
# is logs.

                                                                                

+---------+-----------+------+----------+-------+-----+
|device_id|server_time| ad_id|event_type|country|brand|
+---------+-----------+------+----------+-------+-----+
|        0|          0|186104|         0|      0|    0|
+---------+-----------+------+----------+-------+-----+





+---------+-----------+------+----------+-------+-----+
|device_id|server_time| ad_id|event_type|country|brand|
+---------+-----------+------+----------+-------+-----+
|        0|          0|186104|         0|      0|    0|
+---------+-----------+------+----------+-------+-----+



                                                                                

In [234]:
# Checking for duplicates
def remove_duplicates(df):
    distinct_df = df.distinct()
    print(f"Distinct count: {str(distinct_df.count())}; Total count: {str(df.count())}")
    return distinct_df

# There are 4 rows which are duplicates. We can simply drop those rows.
df = remove_duplicates(df)



Distinct count: 387130; Total count: 387134


                                                                                

In [235]:
# Convert server_time from type string to timestamp
from pyspark.sql.functions import to_timestamp
def convert_timestamp(df, timestamp_column, replace_original=True):
    df_with_timestamp = df.withColumn('timestamp', to_timestamp(timestamp_column, "M/d/yy H:mm"))
    if replace_original:
        df_with_timestamp = df_with_timestamp.drop(timestamp_column)
        df_with_timestamp = df_with_timestamp.withColumnRenamed('timestamp', timestamp_column)
    return df_with_timestamp

In [236]:
df = convert_timestamp(df, 'server_time')
df.show()

# Checking if conversion took place successfully
check_for_nan(df).show()

                                                                                

+--------------------+--------------------+----------+-------------+-------+-------------------+
|           device_id|               ad_id|event_type|      country|  brand|        server_time|
+--------------------+--------------------+----------+-------------+-------+-------------------+
|991c614ed150437b8...|                null|      logs|        India|Brand E|2018-12-17 01:43:00|
|644b2ec426874ff88...|                null|      logs|        China|Brand C|2018-01-21 03:58:00|
|c4e6aeb750c24143a...|0e22a9be59d74e82b...|   display|      Nigeria|Brand A|2018-06-03 05:31:00|
|0cafc7e4fddc4ef7b...|68a47cbc12ed45639...|     close|        India|Brand D|2018-11-26 19:14:00|
|121298cc8a0a4dfc8...|                null|      logs|        India|Brand A|2018-10-06 17:20:00|
|ee7c74ba49e64689b...|11a177ea68ba416fa...|      load|United States|Brand D|2018-11-28 13:47:00|
|34d46dc6466e4a4db...|                null|      logs|      Nigeria|Brand A|2018-04-02 14:51:00|
|249d305027ac4102a...|        

[Stage 630:>                                                        (0 + 8) / 8]

+---------+------+----------+-------+-----+-----------+
|device_id| ad_id|event_type|country|brand|server_time|
+---------+------+----------+-------+-----+-----------+
|        0|186100|         0|      0|    0|          0|
+---------+------+----------+-------+-----+-----------+



                                                                                

In [172]:
# df.where(col('timestamp').isNull()).show()


+---------+-----------+-----+----------+-------+-----+---------+
|device_id|server_time|ad_id|event_type|country|brand|timestamp|
+---------+-----------+-----+----------+-------+-----+---------+
+---------+-----------+-----+----------+-------+-----+---------+



In [237]:
# Since, we are only interested in monthly and daily statistics, hence
# we can drop the entire timestamp, keeping just the dates.

def break_timestamp_into_year_month_date(df, timestamp_column):
    temp_df = df.withColumn('year', F.year(timestamp_column))
    temp_df = temp_df.withColumn('month', F.month(timestamp_column))
    temp_df = temp_df.withColumn('day', F.dayofmonth(timestamp_column))
    temp_df = temp_df.withColumn('date', F.to_date(timestamp_column))
    temp_df = temp_df.withColumn('year-month', F.date_format(timestamp_column, "yyyy-MM"))
    return temp_df

df = break_timestamp_into_year_month_date(df, 'server_time')
df = df.drop('server_time')
df.show()


                                                                                

+--------------------+--------------------+----------+-------------+-------+----+-----+---+----------+----------+
|           device_id|               ad_id|event_type|      country|  brand|year|month|day|      date|year-month|
+--------------------+--------------------+----------+-------------+-------+----+-----+---+----------+----------+
|991c614ed150437b8...|                null|      logs|        India|Brand E|2018|   12| 17|2018-12-17|   2018-12|
|644b2ec426874ff88...|                null|      logs|        China|Brand C|2018|    1| 21|2018-01-21|   2018-01|
|c4e6aeb750c24143a...|0e22a9be59d74e82b...|   display|      Nigeria|Brand A|2018|    6|  3|2018-06-03|   2018-06|
|0cafc7e4fddc4ef7b...|68a47cbc12ed45639...|     close|        India|Brand D|2018|   11| 26|2018-11-26|   2018-11|
|121298cc8a0a4dfc8...|                null|      logs|        India|Brand A|2018|   10|  6|2018-10-06|   2018-10|
|ee7c74ba49e64689b...|11a177ea68ba416fa...|      load|United States|Brand D|2018|   11| 

In [242]:
# Check number of event_type for each event
df.groupBy('event_type').count().show()

# There are 5 types of events:
# System event: logs
# User related events: display, load, click, close

# Out of 4 user related system, only 2 of them are action driven
# User action driven events: click, close

# Hence, we'll only use 2 events -- click, close -- to figure out
# if a particular user actually interacted with the ad or not.



+----------+------+
|event_type| count|
+----------+------+
|   display| 67068|
|      load| 74388|
|     close| 52236|
|      logs|186100|
|     click|  7338|
+----------+------+



                                                                                

In [238]:
interactive_events = ['click', 'close']
df = df.withColumn("interaction", F.when(df.event_type.isin(interactive_events), 1).otherwise(0))


In [241]:
df.show()



+--------------------+--------------------+----------+-------------+-------+----+-----+---+----------+----------+-----------+
|           device_id|               ad_id|event_type|      country|  brand|year|month|day|      date|year-month|interaction|
+--------------------+--------------------+----------+-------------+-------+----+-----+---+----------+----------+-----------+
|991c614ed150437b8...|                null|      logs|        India|Brand E|2018|   12| 17|2018-12-17|   2018-12|          0|
|644b2ec426874ff88...|                null|      logs|        China|Brand C|2018|    1| 21|2018-01-21|   2018-01|          0|
|c4e6aeb750c24143a...|0e22a9be59d74e82b...|   display|      Nigeria|Brand A|2018|    6|  3|2018-06-03|   2018-06|          0|
|0cafc7e4fddc4ef7b...|68a47cbc12ed45639...|     close|        India|Brand D|2018|   11| 26|2018-11-26|   2018-11|          1|
|121298cc8a0a4dfc8...|                null|      logs|        India|Brand A|2018|   10|  6|2018-10-06|   2018-10|     

                                                                                

- [ ] Which ad is displayed to most number of people?
- [ ] Which ad has the highest CTR?

- [ ] Which brand is most famous?

In [239]:
def get_monthly_active_users(df):
    monthly_metric = df.groupBy(['device_id', 'year-month', 'year', 'month', 'country']).max('interaction')
    monthly_metric = monthly_metric.sort(["year-month", "device_id", "country"], ascending=[1, 1])
    monthly_metric.withColumnRenamed("max(interaction)", "active_status")
    return monthly_metric

monthly_active_users_df = get_monthly_active_users(df)
monthly_active_users_df.show()



+--------------------+----------+----+-----+-------------+----------------+
|           device_id|year-month|year|month|      country|max(interaction)|
+--------------------+----------+----+-----+-------------+----------------+
|001186a3f903493ab...|   2018-01|2018|    1|United States|               0|
|00136c179d2e4d13a...|   2018-01|2018|    1|      Nigeria|               0|
|00199e866dab478b9...|   2018-01|2018|    1|        India|               0|
|00284be7ffb24ddc9...|   2018-01|2018|    1|      Nigeria|               0|
|003dfc17f325418d8...|   2018-01|2018|    1|        India|               0|
|00438fd08fc543e29...|   2018-01|2018|    1|        India|               0|
|004ae23dcc2147528...|   2018-01|2018|    1|        India|               0|
|004c18ee4b6c4a15a...|   2018-01|2018|    1|United States|               0|
|007c2f93731d4c7fb...|   2018-01|2018|    1|        India|               0|
|00823a8a7b6f4c37a...|   2018-01|2018|    1|        India|               0|
|009d7b1d8fa

                                                                                

In [240]:
filepath = "./data/monthly_active_users"
monthly_active_users_df.coalesce(1).write.options(header='True').csv(filepath)

                                                                                

In [227]:
# Did a device interact on a particular date?
interactions_df = df.groupBy(['device_id', 'date', 'year', 'month', 'day', 'country']).max('interaction')
interactions_df = interactions_df.sort(["year", "month", "day", "device_id"], ascending=[1, 1, 1, 1])
interactions_df.show()

In [230]:
filepath = "./data/processed_ad_stream"
interactions_df.coalesce(1).write.options(header='True').csv(filepath)


                                                                                

In [191]:
interactions_df.tail(1)


                                                                                

[Row(device_id='fffdd8551e52474d886285426eb71732', year=2018, month=12, day=31, country='China', max(interaction)=0)]