# <strong>Proceso:</strong> Prueba Yapo.cl
## <strong>Objetivo:</strong>
##### Generar un archivo Json con las series de tiempo por producto, tomando en cuenta la fecha y la cantidad de productos vendidos ese día.

## <h2>Descargas e Importaciones Iniciales:</h2>

In [1]:
#Establece conexión a cuenta de google drive que contiene la data input
from google.colab import drive
drive.mount('/content/drive')


#Descarga de java jdk
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Descarga Apache Spark
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
#Descomprime Tar
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
#Instala Findspark
!pip install -q findspark

from datetime import datetime
import json
import os
#Define Variables de Entorno
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Mounted at /content/drive


## Definición de Funciones

In [2]:
#----- Función de inicio de sesión Spark -----
def init_spark():
  spark = SparkSession.builder\
        .master("local")\
        .appName("YapoCL")\
        .getOrCreate()
  return spark

#----- Funcion que unifica todos los csv en un solo Dataframe -----
def data_merging(main_path, folders, egCSV_name):
  for folder in folders:
    month_files = os.listdir(main_path+folder)
    #Lectura del CSV que contiene los headers
    df_eg = spark.read.options(header='True', inferSchema='True', delimiter=',')\
                .csv(main_path+folder+"/"+egCSV_name)
    
    mySchema = df_eg.schema
    #Elimina CSV que ya fue leído de la lista de CSV por leer
    month_files.remove(egCSV_name)

    #Construye rutas completas (path) de los csv para su lectura
    for position in range(0, len(month_files)):
      month_files[position] = main_path+folder+"/"+month_files[position]

    #Lee la lista de CSV pendientes
    month_df = spark.read.options(delimiter=',')\
                    .schema(mySchema)\
                    .csv(month_files)

    if folder == folders[0]:
      month_df_all = month_df.unionByName(df_eg)
    else:
      month_df_aux = month_df.unionByName(df_eg)
      month_df_all = month_df_all.unionByName(month_df_aux)
  return month_df_all

#----- Función que procesa el Dataframe de ventas por productos y fechas -----
def sales_per_product_df(month_df_all):
  month_df_all = month_df_all.withColumn("creation_date", to_date(col("creation_date"), "yyyy-MM-dd"))\
                           .groupBy("product_name", "creation_date")\
                           .agg(count(col("product_name")).alias("num_of_sales"))\
                           .sort("product_name", "creation_date")

  return month_df_all

#----- Función que genera el diccionario de ventas por producto a partir del Dataframe previamente generado -----
def sales_per_product_dict(month_df_all, data_dict):
  product_list = month_df_all.select("product_name").distinct().collect()

  for product in product_list:
    
    date_list = month_df_all.filter(col("product_name")==product.product_name).select("creation_date").collect()
    sales_list = month_df_all.filter(col("product_name")==product.product_name).select("num_of_sales").collect()
    for elem in range(0, len(date_list) ):
      if elem == 0:
        data_dict[product.product_name] = {}
      data_dict[product.product_name][date_list[elem].creation_date.strftime("%Y-%m-%d")] = sales_list[elem].num_of_sales

  
  return data_dict

#----- Función que escribe archivo Json a partir del diccionario creado -----
def dict_to_jsonFile(data_dict, file_name, main_path):
  with open(main_path+file_name, "w") as output:
    json.dump(data_dict, output, indent=6)

### Definición de Variables

In [3]:
# Ruta Drive donde se encuentran los CSV
ruta_principal = "drive/MyDrive/Prueba Yapo/"

# Carpetas de los meses de información disponibles
carpetas = ["may", "june"]


# Nombre del único archivo CSV por carpeta que contiene headers
#(debe ser el mismo para todas las carpetas definidas)
headers_csvNombre = "products00.csv"

# Nombre de archivo Json de salida
nombre_output = "sales.json"


#Se crea diccionario con key "", el cual sera eliminado previo a exportar el archivo json
dataDict = {"": {}}

# Proceso

In [4]:
# Inicio de Spark
spark = init_spark()

# Union de CSVs
consolidado_df = data_merging(ruta_principal, carpetas, headers_csvNombre)


# Calculo de ventas de productos por fecha
ventas_df = sales_per_product_df(consolidado_df)

ventas_df = ventas_df.persist()

# Generación del Diccionario a partir del Dataframe procesado
dataDict = sales_per_product_dict(ventas_df, dataDict)
#Elimina key ""
del dataDict[""]


# Escritura de Archivo Json
dict_to_jsonFile(dataDict, nombre_output, ruta_principal)
