In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession 
import pyspark.sql.functions as F
import pandas as pd
import numpy as np

In [None]:
builder = SparkSession.builder

spark = (builder
         .config('spark.driver.maxResultSize', '21000m')
         .config('spark.scheduler.mode', 'FIFO')
         .config('spark.executor.memory', '15000m')
        ).getOrCreate()

In [None]:
path = 'gs://bkt-dataproc-prod-eu-notebookmatthieubritoantunes/notebooks/jupyter/'
google_categories = (spark.createDataFrame(data=pd.read_csv(path+'google_product_categories.txt', 
                                                            sep=';',
                                                            header=None))
                     .select(F.split('0', ' - ').getItem(0).alias('id'),
                             F.split('0', ' - ').getItem(1).alias('product_category'))
                    )

In [None]:
reader = (spark
              .read
              .option('parentProject' ,'tr-tech-innovation-dev')     
              .option('project','tr-tech-innovation-dev')
              .format('bigquery')
              .option('dataset', 'merchant')
              .option('table', 'BestSellers_TopProducts_8090258')
              .option('filter', '_PARTITIONDATE >= "2020-10-01" AND _PARTITIONDATE <= "2020-10-31"')
)

In [None]:
top_products_tmp = reader.load()
top_products_tmp.createOrReplaceTempView("top_products_db")

In [None]:
top_products_ = (spark.sql('''SELECT
                                  rank_timestamp AS date,
                                  product_title.name AS product_name,
                                  brand AS product_brand,
                                  rank AS product_rank,
                                  previous_rank AS previous_product_rank,
                                  ranking_category AS product_ranking_category,
                                  ranking_country AS country,
                                  price_range.min AS product_min_price,
                                  price_range.max AS product_max_price,
                                  price_range.currency AS price_currency
                              FROM
                                  top_products_db
                         ''')
)

top_products_ = (top_products_.withColumn('avg_product_price', 
                                          0.5*(F.col('product_max_price')
                                              +F.col('product_min_price'))
                                         )
                 .join(google_categories,
                       F.col('product_ranking_category')==F.col('id'),
                       how='leftOuter'
                     )
                )
top_products_.cache().count()

In [None]:
top_products = (top_products_.filter((F.col('date').isNotNull())
                                   &(F.col('product_brand').isNotNull())
                                   &(F.col('product_category').isNotNull())
                                   &(F.col('country').isNotNull())
                                   &(F.col('product_min_price').isNotNull())
                                   &(F.col('product_max_price').isNotNull())
                                   &(F.col('price_currency').isNotNull())
                                   )
                .select(F.col('date'),
                        F.col('product_name').getItem(0).alias('product_name'),
                        F.col('product_brand'),
                        F.col('product_category'),
                        F.col('product_rank'),
                        F.col('previous_product_rank'),
                        F.col('country'),
                        F.col('avg_product_price').alias('product_price'), #average min+max
                        F.col('price_currency')
                       )
               )
top_products.cache().count()

In [None]:
del top_products_

In [None]:
top_products.agg(F.min('date')).show()

In [None]:
top_products.agg(F.max('date')).show()

In [None]:
path = 'gs://bkt-dataproc-prod-eu-notebookmatthieubritoantunes/google-merchant-centre/'

(top_products.write
.mode('overwrite')
.options(header='true', delimiter=',')
.parquet(path+'top_products_gmc_10_2020.parquet')
)