In [1]:
import pandas as pd
import numpy as np
import os
import datetime
import pandasql
import json
from tasks import read_impression_file, process_impression_file, load_raw_impressions, process_click_file, load_clicks, process_deduped_impressions, load_deduped_impressions, attribute_impressions, load_impressions

In [2]:
from models import Impression, Revenue

In [4]:
process_impression_file('resources/impressions.json')

In [5]:
process_click_file('resources/clicks.json')

In [6]:
process_deduped_impressions()

In [7]:
deduped_impression_df = load_deduped_impressions()

In [8]:
click_df = load_clicks()

In [9]:
attribute_impressions(deduped_impression_df, click_df)

In [10]:
impressions_df = load_impressions()

In [11]:
with open("resources/calculate_metrics.ql", "r") as input:
        query = input.read()

In [12]:
output_df = pandasql.sqldf(query, globals())

In [13]:
output_impressions = [(Impression(row.app_id, row.country_code, row.impressions, row.clicks, row.revenue)).__dict__ for index, row in output_df.iterrows() ]

In [14]:
with open('output.json', 'w') as output:
    output.write(json.dumps(output_impressions))

In [15]:
with open("resources/recommend_ad.ql", "r") as input:
        query = input.read()

In [16]:
top_5_df = pandasql.sqldf(query, globals())

In [17]:
top_5_df

Unnamed: 0,app_id,country_code,recommended_advertiser_ids
0,0,IT,22
1,1,IT,7
2,4,IT,15
3,4,UK,32
4,5,DE,21
5,5,IT,12
6,5,US,1025
7,6,DE,15
8,6,US,1631
9,7,DE,20


In [18]:
output_revenue = [(Revenue(row.app_id, row.country_code, row.recommended_advertiser_ids)).__dict__ for index, row in top_5_df.iterrows() ]

In [19]:
with open('output_revenue_pandas.json', 'w') as output:
    output.write(json.dumps(output_revenue))

In [20]:
#impressions_df.groupby(['app_id', 'country_code', 'advertiser_id']).agg({'revenue' : ['count', 'sum']})

In [21]:
from pyspark.sql import SparkSession

In [22]:
spark = SparkSession \
    .builder \
    .appName("Impression Rate") \
    .getOrCreate()

22/02/16 15:58:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [23]:
spark.read.parquet('data/impressions').createOrReplaceTempView('impressions_df')

                                                                                

In [24]:
query = """
WITH grouped_impressions as (
SELECT 
    app_id, 
    country_code,
    advertiser_id, 
    count(1) as impressions,
    SUM(CASE WHEN revenue IS NULL THEN 0 ELSE revenue END) as revenue
FROM impressions_df
WHERE app_id != -1 AND advertiser_id != -1
GROUP BY 1, 2, 3
),
revenue_rate_data as (
    SELECT app_id, country_code, advertiser_id, revenue / impressions as revenue_rate
    FROM grouped_impressions
),
revenue_rate_data_with_rn as (
    SELECT *, ROW_NUMBER() OVER(PARTITION BY app_id, country_code ORDER BY revenue_rate DESC) as rn
    FROM revenue_rate_data
)
SELECT 
    CAST(app_id as int) as app_id, country_code
    , collect_list(advertiser_id) as recommended_advertiser_ids
FROM revenue_rate_data_with_rn
WHERE rn <= 5
GROUP BY app_id, country_code
ORDER BY app_id, country_code
"""

In [25]:
top_5_df = spark.sql(query).toPandas()

                                                                                

In [26]:
class Revenue:
    def __init__(self, app_id, country_code, recommended_advertiser_ids):
        self.app_id = app_id
        self.country_code = country_code
        self.recommended_advertiser_ids = recommended_advertiser_ids

In [27]:
output_revenue = [(Revenue(row.app_id, row.country_code, row.recommended_advertiser_ids)).__dict__ for index, row in top_5_df.iterrows() ]

In [28]:
with open('output_revenue.json', 'w') as output:
    output.write(json.dumps(output_revenue))