# VERVE CODING CHALLENGE

### Importing useful Libraries

In [78]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from urllib.request import urlopen

### Starting Spark Session with 8 cores

In [79]:
spark = SparkSession.builder \
    .master("local[8]") \
    .appName("VerveCodingChallenge") \
    .config("spark.some.config.option", "vcc") \
    .getOrCreate()

# QUESTION 1:

## Reading events stored in JSON files

### Loading the URL JSON Dataset file path

In [80]:
path_1 = "https://gist.githubusercontent.com/mpasa/9a710abe1f93335fa00dddae3d6e9401/raw/b573f05afc5ee0915ba1b0c3d56734d571fe3a56/clicks.json"
path_2 = "https://gist.githubusercontent.com/mpasa/9a710abe1f93335fa00dddae3d6e9401/raw/b573f05afc5ee0915ba1b0c3d56734d571fe3a56/impressions.json"

### Reading and parsing the JSON files into Spark DataFrame

In [81]:
def read_json(p1, p2):
  clicks_json = (urlopen(path_1).read().decode('utf-8'))
  clicks_rdd = spark.sparkContext.parallelize([clicks_json])
  clicks = spark.read.json(clicks_rdd)

  impressions_json = (urlopen(path_2).read().decode('utf-8'))
  impressions_rdd = spark.sparkContext.parallelize([impressions_json])
  impressions = spark.read.json(impressions_rdd)

  return clicks, impressions

In [82]:
clicks, impressions = read_json(path_1, path_2)
clicks.show(5), impressions.show(5)

+--------------------+------------------+
|       impression_id|           revenue|
+--------------------+------------------+
|97dd2a0f-6d42-4c6...| 2.091225600111518|
|43bd7feb-3fea-40b...|2.4794577548980876|
|1b04c706-e3d7-4f7...|1.0617394700223026|
|31214d91-d950-446...| 2.074762244925742|
|fbb52038-4db1-46d...|2.7837584244414537|
+--------------------+------------------+
only showing top 5 rows

+-------------+------+------------+--------------------+
|advertiser_id|app_id|country_code|                  id|
+-------------+------+------------+--------------------+
|            8|    32|          UK|a39747e8-9c58-41d...|
|           17|    30|        null|5deacf2d-833a-454...|
|           15|     4|          IT|2ae9fd3f-4c70-4d9...|
|           20|    22|          IT|fbb52038-4db1-46d...|
|           32|     9|            |b15449b6-14c9-406...|
+-------------+------+------------+--------------------+
only showing top 5 rows



(None, None)

# QUESTION 2:

## Calculating metrics for some dimensions

### MERGING THE DATASETS

In order to effectively perform the needed operation on the Datasets, the need to create a single view of the two tables is inevitable.

Doing this will give the ease to pass queries for the needed metrics.

In [83]:
df = impressions.join(clicks,impressions.id == clicks.impression_id, "inner")
df.show(5)

+-------------+------+------------+--------------------+--------------------+------------------+
|advertiser_id|app_id|country_code|                  id|       impression_id|           revenue|
+-------------+------+------------+--------------------+--------------------+------------------+
|           27|    30|          DE|8a391b7a-219e-4dc...|8a391b7a-219e-4dc...|1.9016930019722926|
|           27|    30|          DE|8a391b7a-219e-4dc...|8a391b7a-219e-4dc...|1.9016930019722926|
|           27|    30|          DE|8a391b7a-219e-4dc...|8a391b7a-219e-4dc...|1.9016930019722926|
|           27|    30|          DE|8a391b7a-219e-4dc...|8a391b7a-219e-4dc...|1.9016930019722926|
|           27|    30|          DE|8a391b7a-219e-4dc...|8a391b7a-219e-4dc...|1.9016930019722926|
+-------------+------+------------+--------------------+--------------------+------------------+
only showing top 5 rows



### Getting the required metrics

In [84]:
metricsOnDimension = df \
                        .groupBy(["app_id", "country_code"]) \
                        .agg\
                        (
                             F.count("id").alias("impressions"),
                             F.countDistinct("revenue").alias("clicks"),
                             F.sum("revenue").alias("revenue")
                        ) \
                        .filter("country_code is not null") \
                        .sort(F.col("app_id").cast("int"))

In [85]:
print(metricsOnDimension.count())

43


### Format conversion for ease of aceess

In [86]:
type(metricsOnDimension)
metricsOnDimension_pd = metricsOnDimension.toPandas()

In [87]:
metricsOnDimension_pd.head(5)

Unnamed: 0,app_id,country_code,impressions,clicks,revenue
0,1,IT,21,1,48.09064
1,2,,62,2,77.821671
2,4,IT,46,2,101.783842
3,5,IT,32,2,82.184541
4,5,DE,21,1,16.562941


### Outputting the Result as a JSON file

In [88]:
sourceFile_2 = open('Question_2_result_output.json', 'w')
for index, item in metricsOnDimension_pd.iterrows():
    # ad_id.append(item[1])
    print("\n\t{\n\t\t\"app_id\":", item["app_id"], "\n \t\t\"country_code\":", item["country_code"], "\n \t\t\"impressions\":", item["impressions"],  "\n \t\t\"clicks\":", item["clicks"],  "\n \t\t\"revenue\":", item["revenue"],"\n\t},", file = sourceFile_2)
sourceFile_2.close()

In [89]:
%cat Question_2_result_output.json > "/home/computech/PycharmProjects/VCC/venv/vccarena/Q2_output.json"
%cat Question_2_result_output.json


	{
		"app_id": 1 
 		"country_code": IT 
 		"impressions": 21 
 		"clicks": 1 
 		"revenue": 48.09063975343832 
	},

	{
		"app_id": 2 
 		"country_code":  
 		"impressions": 62 
 		"clicks": 2 
 		"revenue": 77.82167117446404 
	},

	{
		"app_id": 4 
 		"country_code": IT 
 		"impressions": 46 
 		"clicks": 2 
 		"revenue": 101.78384185682643 
	},

	{
		"app_id": 5 
 		"country_code": IT 
 		"impressions": 32 
 		"clicks": 2 
 		"revenue": 82.18454096576062 
	},

	{
		"app_id": 5 
 		"country_code": DE 
 		"impressions": 21 
 		"clicks": 1 
 		"revenue": 16.562940714608928 
	},

	{
		"app_id": 6 
 		"country_code":  
 		"impressions": 60 
 		"clicks": 2 
 		"revenue": 83.23202626929094 
	},

	{
		"app_id": 7 
 		"country_code": IT 
 		"impressions": 10 
 		"clicks": 1 
 		"revenue": 9.348166491994709 
	},

	{
		"app_id": 7 
 		"country_code": DE 
 		"impressions": 32 
 		"clicks": 1 
 		"revenue": 11.411954343923577 
	},


# QUESTION 3:

### Making recommendation for top 5 advertiser per 'app_id', per 'country_code'

### Performing the needed operation

In [90]:
ad_recommendation = df \
                    .groupBy("app_id", "country_code", "advertiser_id") \
                    .agg(F.count("id").alias("impressions"), F.countDistinct("revenue").alias("clicks"), F.sum("revenue").alias("revenue")) \
                    .filter("country_code is not null") \
                    .sort(F.col("app_id").cast("int")) \
                    .groupBy("app_id", "country_code", "advertiser_id") \
                    .agg(F.max("revenue").alias("Total Revenue"))

ad_recommendation.count(), ad_recommendation.show(5)

+------+------------+-------------+------------------+
|app_id|country_code|advertiser_id|     Total Revenue|
+------+------------+-------------+------------------+
|    13|          US|           12| 85.98714294381844|
|    23|          US|           17|25.078424056691844|
|     9|          US|            8| 80.90569408371147|
|    15|            |           27| 63.52933382616075|
|    26|          DE|           16|44.467305314479475|
+------+------------+-------------+------------------+
only showing top 5 rows



(46, None)

### Format conversion for ease of access

In [91]:
ad_recommendation_pd = ad_recommendation.toPandas()
ad_recommendation_pd.head(3)

Unnamed: 0,app_id,country_code,advertiser_id,Total Revenue
0,13,US,12,85.987143
1,23,US,17,25.078424
2,9,US,8,80.905694


### Writing Output as JSON file

In [92]:
us_ads = []
uk_ads = []
it_ads = []
de_ads = []
others = []

for index, items in ad_recommendation_pd.iterrows():

    if items['country_code'] == 'US': us_ads.append(items['advertiser_id'])
    elif items['country_code'] == 'UK': uk_ads.append(items['advertiser_id'])
    elif items['country_code'] == 'IT': it_ads.append(items['advertiser_id'])
    elif items['country_code'] == 'DE': de_ads.append(items['advertiser_id'])
    else: others.append(items['advertiser_id'])



In [93]:
sourceFile_3 = open('Question_3_result_output.json', 'w')
for index, ids in ad_recommendation_pd.iterrows():
    if ids["country_code"] == 'US': print("\n\t{\n\t\t\"app_id\":", ids["app_id"], "\n \t\t\"country_code\":", ids["country_code"], "\n \t\t\"recommended_advertiser_id\":", us_ads[:5], "\n\t},", file = sourceFile_3)
    elif ids["country_code"] == 'UK': print("\n\t{\n\t\t\"app_id\":", ids["app_id"], "\n \t\t\"country_code\":", ids["country_code"], "\n \t\t\"recommended_advertiser_id\":", uk_ads[:5], "\n\t},", file = sourceFile_3)
    elif ids["country_code"] == 'IT': print("\n\t{\n\t\t\"app_id\":", ids["app_id"], "\n \t\t\"country_code\":", ids["country_code"], "\n \t\t\"recommended_advertiser_id\":", it_ads[:5], "\n\t},", file = sourceFile_3)
    elif ids["country_code"] == 'DE': print("\n\t{\n\t\t\"app_id\":", ids["app_id"], "\n \t\t\"country_code\":", ids["country_code"], "\n \t\t\"recommended_advertiser_id\":", de_ads[:5], "\n\t},", file = sourceFile_3)
    else: print("\n\t{\n\t\t\"app_id\":", ids["app_id"], "\n \t\t\"country_code\":", ids["country_code"], "\n \t\t\"recommended_advertiser_id\":", others[:5], "\n\t},", file = sourceFile_3)
sourceFile_3.close()

In [94]:
%ls
%cd /tmp

 [0m[01;34mblockmgr-a4000823-c856-4bc2-839b-ba7b9f5ea7c6[0m/
 [01;34mblockmgr-e1b9ff69-31a1-4685-91be-9588d91367b2[0m/
[01;34m'Bridge To Kubernetes'[0m/
 config-err-cnV85d
 [01;34mhsperfdata_computech[0m/
 [01;34mhsperfdata_root[0m/
 idea.sbt
 [01;34mintellij-scala-trace-logger[0m/
 juju-mk0fa45cb1ce4f94e72bc1914d8a34d48af92801
 juju-mk5de22c3aec3ce5243c7c94cf68c5a7a1ae698f
 juju-mkaf09319d9954fa102c931ecbd41944d9dd29b9
 [01;34mkafka-logs[0m/
 kotlin-idea-9547162286627969969-is-running
 liblz4-java-14608291342448917710.so
 liblz4-java-14608291342448917710.so.lck
 liblz4-java-20064901511103493.so
 liblz4-java-20064901511103493.so.lck
 migrate_db
 minikube_update-check_b107324af521d9e2e50fd3fe6194bb47b4fb5279_0.log
 minikube_version_93e4805e095a4d039cc9de3d9ca359634884ef5c_0.log
 model.model.ce94127b
 [01;34mmozilla_computech0[0m/
 [01;34mQ2_output.json[0m/
 qtsingleapp-RStudi-c679-3e8-lockfile
 Question_2_result_output.json
 Question_3_result_

In [95]:
%cat Question_3_result_output.json > "/home/computech/PycharmProjects/VCC/venv/vccarena/Q3_output.json"
%cat Question_3_result_output.json


	{
		"app_id": 13 
 		"country_code": US 
 		"recommended_advertiser_id": [12, 17, 8, 5, 9] 
	},

	{
		"app_id": 23 
 		"country_code": US 
 		"recommended_advertiser_id": [12, 17, 8, 5, 9] 
	},

	{
		"app_id": 9 
 		"country_code": US 
 		"recommended_advertiser_id": [12, 17, 8, 5, 9] 
	},

	{
		"app_id": 15 
 		"country_code":  
 		"recommended_advertiser_id": [27, 12, 7, 7, 23] 
	},

	{
		"app_id": 26 
 		"country_code": DE 
 		"recommended_advertiser_id": [16, 14, 22, 10, 27] 
	},

	{
		"app_id": 40 
 		"country_code": DE 
 		"recommended_advertiser_id": [16, 14, 22, 10, 27] 
	},

	{
		"app_id": 25 
 		"country_code":  
 		"recommended_advertiser_id": [27, 12, 7, 7, 23] 
	},

	{
		"app_id": 2 
 		"country_code":  
 		"recommended_advertiser_id": [27, 12, 7, 7, 23] 
	},

	{
		"app_id": 39 
 		"country_code": US 
 		"recommended_advertiser_id": [12, 17, 8, 5, 9] 
	},

	{
		"app_id": 30 
 		"country_code": US 
 		"recommended_

### Thank you for reading thus far