# Install requirements

In [None]:
%pip install -r requirements.txt

# Obtain input data

In [None]:
import os 

from challenge import AzureStorageConnector, TEMP_PATH

In [None]:
conn_string = dbutils.secrets.get(scope="storage-secret-scope", key="storage_conn_string")

storage_connector = AzureStorageConnector(conn_string)

for file_name in ["ads.json", "views.json"]:
    storage_connector.get_file(file_name)

In [None]:
ads_df = spark.read.json(f"file://{TEMP_PATH}/ads.json")
views_df = spark.read.json(f"file://{TEMP_PATH}/views.json")


In [None]:
ads_df.show()

# Flatten and clean datasets

In [None]:
from challenge import flatten, remove_underscores_from_column

In [None]:
ads_df = flatten(ads_df)
views_df = flatten(views_df)
ads_df.show()

In [None]:
ads_df = remove_underscores_from_column(ads_df,"attributes__exteriorColor")
ads_df.show()


# Aggregations

In [None]:
ads_pdf = ads_df.pandas_api()
views_pdf = views_df.pandas_api()


In [None]:
ads_pdf["price__consumerValue__gross"] = ads_pdf["price__consumerValue__gross"].astype(int)

The top 3 most expensive cars (gross) per make/model are:

In [None]:
most_expensive = ads_pdf.groupby(["make","model"])["price__consumerValue__gross"].mean().reset_index()[:3]

print("The most expensive car models are:")
for i, row in most_expensive.iterrows():
    print(f"{i+1}. {row['make']} {row['model']} - {round(row['price__consumerValue__gross'],2)} EUR")

In [None]:
ads_pdf.head()

In [None]:
views_pdf.head()

In [None]:
merged_pdf = ads_pdf.merge(views_pdf, left_on="id", right_on="ad__id")
merged_pdf.head()

The most popular colors of cars based on ad interaction are:

In [None]:
color_popularity = merged_pdf.groupby("attributes__exteriorColor")["id"].count().sort_values(ascending=False)[:3]

print("The most popular colors are:")
for color, views in color_popularity.items():
    print(f"{color} - {views} views")

In [None]:
ad_durations = views_pdf.groupby("ad__id").agg({"event__time":["min","max"]})
ad_durations.columns = ["min_time","max_time"]
ad_durations["time_delta"] = ad_durations["max_time"]-ad_durations["min_time"]
ad_durations

# Write output files

In [None]:
file_path = os.path.join(f"file://{TEMP_PATH}","_ad_durations.csv")

ad_durations.to_csv(file_path)


In [None]:
ads_df.repartition(1).write.parquet(os.path.join(f"file://{TEMP_PATH}","_ads.parquet"),"overwrite")

In [None]:
views_df.repartition(1).write.parquet(os.path.join(f"file://{TEMP_PATH}","_views.parquet"),"overwrite")

In [None]:
for file_name in ["ad_durations.csv","ads.parquet","views.parquet"]:
    file_ext = file_name.split(".")[-1]
    os.system(f"mv {TEMP_PATH}/_{file_name}/*.{file_ext} {TEMP_PATH}/{file_name}")
    storage_connector.write_file(file_name)

# Clean up

In [None]:
%sh 
rm -rf /tmp/challenge-data/*