In [330]:
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType, DoubleType, LongType
from pyspark.sql.functions import expr
import pyspark.sql.functions as sf


# 1. Process teinvento information

In [160]:
url_to_load = "gs://opi_raw_data/09042021/teinvento_inc/ventas_reportadas_mercado_tamales/mx/20200801/fact_table"

fact_table_schema = StructType([
        StructField("year", IntegerType()),
        StructField("month", StringType()),
        StructField("sales", DoubleType()),
        StructField("id_region", LongType()),
        StructField("id_product", StringType()),
    ])

fact_table = spark.read.schema(fact_table_schema).csv(url_to_load)

In [161]:
url_to_load = "gs://opi_raw_data/09042021/teinvento_inc/ventas_reportadas_mercado_tamales/mx/20200801/product_dim"

product_dim_schema = StructType([
        StructField("id_product", StringType()),
        StructField("calorie_category", StringType()),
        StructField("product", StringType()),
        StructField("product_brand", StringType()),
        StructField("producer", StringType()),
    ])

product_dim = spark.read.schema(product_dim_schema).csv(url_to_load)

In [162]:
url_to_load = "gs://opi_raw_data/09042021/teinvento_inc/ventas_reportadas_mercado_tamales/mx/20200801/region_dim"

region_dim_schema = StructType([
        StructField("id_region", LongType()),
        StructField("country", StringType()),
        StructField("region", StringType()),
    ])

region_dim = spark.read.schema(region_dim_schema).csv(url_to_load)



In [163]:
#Join tables and sort columns
join_expr = fact_table.id_product == product_dim.id_product
join_expr2 = fact_table.id_region == region_dim.id_region

inventadasa_table = fact_table.join(product_dim, join_expr, "inner").drop(product_dim.id_product)
inventadasa_table = inventadasa_table.join(region_dim, join_expr2, "inner").drop(region_dim.id_region)
inventadasa_table = inventadasa_table.withColumn("source", sf.lit("inventadasa"))
inventadasa_table = inventadasa_table.select("source", "year", "month", "country", "calorie_category", "id_region", "region", "id_product", "product", "product_brand", "producer", "sales")
inventadasa_table

DataFrame[source: string, year: int, month: string, country: string, calorie_category: string, id_region: bigint, region: string, id_product: string, product: string, product_brand: string, producer: string, sales: double]

# 2. Process tamales inc information

In [164]:

url_to_load = "gs://opi_raw_data/09042021/tamales_inc/ventas_mensuales_tamales_inc/mx/20200801/csv/Centro"

tamales_schema = StructType([
        StructField("year", IntegerType()),
        StructField("month", StringType()),
        StructField("country", StringType()),
        StructField("calorie_category", StringType()),
        StructField("flavor", StringType()),
        StructField("zone", StringType()),
        StructField("product_code", StringType()),
        StructField("product_name", StringType()),
        StructField("sales", DoubleType()),
    ])

ventas_centro = spark.read.schema(tamales_schema).csv(url_to_load)


In [165]:
url_to_load = "gs://opi_raw_data/09042021/tamales_inc/ventas_mensuales_tamales_inc/mx/20200801/csv/E._Privados"

tamales_schema = StructType([
        StructField("year", IntegerType()),
        StructField("month", StringType()),
        StructField("country", StringType()),
        StructField("calorie_category", StringType()),
        StructField("flavor", StringType()),
        StructField("zone", StringType()),
        StructField("product_code", StringType()),
        StructField("product_name", StringType()),
        StructField("sales", DoubleType()),
    ])

ventas_privados = spark.read.schema(tamales_schema).csv(url_to_load)

In [166]:
url_to_load = "gs://opi_raw_data/09042021/tamales_inc/ventas_mensuales_tamales_inc/mx/20200801/csv/Norte"

tamales_schema = StructType([
        StructField("year", IntegerType()),
        StructField("month", StringType()),
        StructField("country", StringType()),
        StructField("calorie_category", StringType()),
        StructField("flavor", StringType()),
        StructField("zone", StringType()),
        StructField("product_code", StringType()),
        StructField("product_name", StringType()),
        StructField("sales", DoubleType()),
    ])

ventas_norte = spark.read.schema(tamales_schema).csv(url_to_load)

In [167]:
url_to_load = "gs://opi_raw_data/09042021/tamales_inc/ventas_mensuales_tamales_inc/mx/20200801/csv/Sur"

tamales_schema = StructType([
        StructField("year", IntegerType()),
        StructField("month", StringType()),
        StructField("country", StringType()),
        StructField("calorie_category", StringType()),
        StructField("flavor", StringType()),
        StructField("zone", StringType()),
        StructField("product_code", StringType()),
        StructField("product_name", StringType()),
        StructField("sales", DoubleType()),
    ])

ventas_sur = spark.read.schema(tamales_schema).csv(url_to_load)

In [168]:
#Join tables and sort columns

tamalesinc_table = ventas_norte.union(ventas_sur).union(ventas_privados).union(ventas_centro)
join_expr = tamalesinc_table.zone == region_dim.region
tamalesinc_table = tamalesinc_table.join(region_dim, join_expr, "inner")
tamalesinc_table = tamalesinc_table.drop(region_dim.country).drop(region_dim.region).withColumnRenamed("zone", "region")
tamalesinc_table = tamalesinc_table.withColumnRenamed("product_code", "id_product")
tamalesinc_table = tamalesinc_table.withColumn("product", expr("concat(product_name, ' ', flavor)")).withColumn("product_brand", expr("concat(product_name, ' ', flavor)"))
tamalesinc_table = tamalesinc_table.withColumn("producer", sf.lit("Tamales Inc"))
tamalesinc_table = tamalesinc_table.withColumn("source", sf.lit("tamalesinc"))
tamalesinc_table = tamalesinc_table.select("source", "year", "month", "country", "calorie_category", "id_region", "region", "id_product", "product", "product_brand", "producer", "sales")
tamalesinc_table

DataFrame[source: string, year: int, month: string, country: string, calorie_category: string, id_region: bigint, region: string, id_product: string, product: string, product_brand: string, producer: string, sales: double]

In [169]:
inventadasa_table.show(3)

+-----------+----+-----+-------+----------------+------------+------+-------------+--------------------+--------------------+-----------------+------------------+
|     source|year|month|country|calorie_category|   id_region|region|   id_product|             product|       product_brand|         producer|             sales|
+-----------+----+-----+-------+----------------+------------+------+-------------+--------------------+--------------------+-----------------+------------------+
|inventadasa|2019|  Sep| Mexico|            Zero|446676598784| Norte|1279900254208|Tamal fitnes - Dulce|Tamal fitnes - Dulce|OTROS FABRICANTES|11433.743954923688|
|inventadasa|2019|  Jun| Mexico|            Zero|446676598784| Norte|1279900254208|Tamal fitnes - Dulce|Tamal fitnes - Dulce|OTROS FABRICANTES| 12210.16091084782|
|inventadasa|2018|  Jan| Mexico|            Zero|446676598784| Norte|1279900254208|Tamal fitnes - Dulce|Tamal fitnes - Dulce|OTROS FABRICANTES|      14099.178355|
+-----------+----+----

In [170]:
tamalesinc_table.show(3)

+----------+----+-----+-------+----------------+------------+------+----------+----------------+----------------+-----------+---------+
|    source|year|month|country|calorie_category|   id_region|region|id_product|         product|   product_brand|   producer|    sales|
+----------+----+-----+-------+----------------+------------+------+----------+----------------+----------------+-----------+---------+
|tamalesinc|2019|  Dec| Mexico|         Regular|446676598784| Norte|ABYT055818|Atole mix Cajeta|Atole mix Cajeta|Tamales Inc| -3.2E-13|
|tamalesinc|2020|  May| Mexico|         Regular|446676598784| Norte|ABYT055818|Atole mix Cajeta|Atole mix Cajeta|Tamales Inc|      0.0|
|tamalesinc|2019|  Jan| Mexico|         Regular|446676598784| Norte| 206050084| Tamal mix Dulce| Tamal mix Dulce|Tamales Inc|126993.08|
+----------+----+-----+-------+----------------+------------+------+----------+----------------+----------------+-----------+---------+
only showing top 3 rows



# 3. Union both tables

In [173]:
full_table = inventadasa_table.union(tamalesinc_table)
full_table

DataFrame[source: string, year: int, month: string, country: string, calorie_category: string, id_region: bigint, region: string, id_product: string, product: string, product_brand: string, producer: string, sales: double]

In [199]:
#sum_sales = sf.sum("sales").alias("sales_month_by_calorie_category")
#sales_month_by_calorie_category = full_table.groupBy("year", "month", "id_product", "calorie_category").agg(sum_sales)

In [203]:
#join_expr = (full_table.year == sales_month_by_calorie_category.year) & (full_table.month == sales_month_by_calorie_category.month) & (full_table.id_product == sales_month_by_calorie_category.id_product) & (full_table.calorie_category == sales_month_by_calorie_category.calorie_category)
#engineered_table = full_table.join(sales_month_by_calorie_category, join_expr, "inner").drop(sales_month_by_calorie_category.year).drop(sales_month_by_calorie_category.month).drop(sales_month_by_calorie_category.id_product).drop(sales_month_by_calorie_category.calorie_category)
#engineered_table = engineered_table.select("source", "year", "month", "country", "calorie_category", "id_region", "region", "id_product", "product", "product_brand", "producer", "sales")

In [317]:
#Sum across sources and regions
sum_sales = sf.sum("sales").alias("total_sales")
sum_table = full_table.groupBy("year", "month", "country", "calorie_category", "id_product", "product", "product_brand", "producer").agg(sum_sales)

In [318]:
mapping_month= {
        'Jan': "1",
        'Feb': "2",
        'Mar': "3",
        'Apr': "4",
        'May': "5",
        'Jun': "6",
        'Jul': "7",
        'Aug': "8",
        'Sep': "9",
        'Oct': "10",
        'Nov': "11",
        'Dec': "12",
    }

In [319]:
calorie_category_agg

DataFrame[year: int, month: string, id_product: string, calorie_category: string, avg_sales_month_by_calorie_category: double, max_sales_month_by_calorie_category: double, weight_sales_month_by_calorie_category: double]

In [320]:
#Sum grouped by year, month, product and category
avg_product_category = sf.avg("total_sales").alias("avg_sales_month_by_calorie_category")
max_product_category = sf.max("total_sales").alias("max_sales_month_by_calorie_category")
calorie_category_agg = sum_table.groupBy("year", "month", "id_product", "calorie_category").agg(*[avg_product_category, max_product_category])
calorie_category_agg = calorie_category_agg.withColumn("weight_sales_month_by_calorie_category", sf.col("avg_sales_month_by_calorie_category")/sf.col("max_sales_month_by_calorie_category"))

join_expr = (sum_table.year == calorie_category_agg.year) & (sum_table.month == calorie_category_agg.month) & (sum_table.id_product == calorie_category_agg.id_product) & (sum_table.calorie_category == calorie_category_agg.calorie_category)
engineered_table = sum_table.join(calorie_category_agg, join_expr, "inner").drop(calorie_category_agg.year).drop(calorie_category_agg.month).drop(calorie_category_agg.id_product).drop(calorie_category_agg.calorie_category)
engineered_table = engineered_table.replace(to_replace=mapping_month, subset=['month'])
engineered_table = engineered_table.withColumn("year_month", expr("to_date(concat('1', '/', month,'/',year), 'd/M/y')"))
engineered_table = engineered_table.select("year", "month", "year_month", "country", "calorie_category", "id_product", "product", "product_brand", "producer", "total_sales", "weight_sales_month_by_calorie_category")
engineered_table

DataFrame[year: int, month: string, year_month: date, country: string, calorie_category: string, id_product: string, product: string, product_brand: string, producer: string, total_sales: double, weight_sales_month_by_calorie_category: double]

In [335]:
1+2

3

In [338]:
timestamp = time.strftime("%Y%m%d-%H%M%S")
engineered_table.createOrReplaceTempView("engineered_table")
train_set = spark.sql("""
          SELECT *
          FROM engineered_table
          WHERE MOD(ABS(HASH(id_product)), 10) < 8""")

url_to_dump_train = "gs://opi_processed_data/" + timestamp + "/train_set"
train_set.write.parquet(url_to_dump_train)

In [None]:
test_set = spark.sql("""
          SELECT *
          FROM engineered_table
          WHERE MOD(ABS(HASH(id_product)), 10) >= 8""")

url_to_dump_test = "gs://opi_processed_data/" + timestamp + "/test_set"
test_set.write.parquet(url_to_dump_test)

In [197]:
import time
timestamp = time.strftime("%Y%m%d-%H%M%S")
url_to_dump = "gs://opi_processed_data/{}/full_table/".format(timestamp)

engineered_table.write.csv(url_to_dump)

In [345]:
import os
import google.auth
import gcsfs
import pandas as pd

#os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "secrets/youtubelist-256522-d34f74f9ba1c.json"

credentials, _ = google.auth.default()
fs = gcsfs.GCSFileSystem(project='youtubelist-256522')
files = [path for path in fs.ls('opi_processed_data/04122021010330/train_set') if path.endswith(".parquet")]

df = pd.DataFrame()
for file_ in files:
    csv = pd.read_parquet("gs://" + file_)
    df = df.append(csv)


In [10]:
df.groupby(["product", "calorie_category"])["sales"].mean()

product                         calorie_category
Atole mix Arroz                 Regular             4.626890e+01
Atole mix Cajeta                Regular             5.002225e+05
Atoles doña chona - Cajeta      Regular             1.646782e+05
Champurrado Ramirez             Regular             8.594341e+06
Tamal fitnes - Dulce            Zero                8.636620e+03
Tamal mix Dulce                 Regular             2.735189e+07
Tamal mix Mole                  Light               3.088007e+03
                                Regular             3.908680e+02
Tamal mix Verde                 Regular             2.201148e+07
Tamales doña chona - Chocolate  Regular             8.791338e+00
Tameles doña chona -Piña        Regular             2.177898e+07
Name: sales, dtype: float64

In [22]:
from pyspark.sql import functions as f

calories_per_month = f.avg("sales").alias("peso_mes_por_categoria_calorica")

peso_mes_por_categoria_calorica = full_table_reordered \
        .groupBy("product", "calorie_category") \
        .agg(calories_per_month)

In [23]:
peso_mes_por_categoria_calorica

DataFrame[product: string, calorie_category: string, peso_mes_por_categoria_calorica: double]

In [25]:
#join_expr = (full_table_reordered.product == peso_mes_por_categoria_calorica.product) & (full_table_reordered.calorie_category == peso_mes_por_categoria_calorica.calorie_category)

#full_table_reordered_engineered = full_table_reordered.join(peso_mes_por_categoria_calorica, join_expr, "inner").drop(peso_mes_por_categoria_calorica.product).drop(peso_mes_por_categoria_calorica.calorie_category)

In [None]:
"year", "month", "year_month", "country", "calorie_category", "id_region", "region", "id_product", "product", "product_brand", "producer", "total_sales", "weight_sales_month_by_calorie_category

In [326]:
from google.cloud import bigquery

client = bigquery.Client()

# Perform a query.
QUERY = (
    "SELECT * FROM `youtubelist-256522.products_feature_store.features` WHERE id_product = '{}' ORDER BY year_month DESC LIMIT 10".format("206050084")
)
print(QUERY)
query_job = client.query(QUERY)  
rows = query_job.result()  

for row in rows:
    print(row.country, row.calorie_category, row.id_product, row.total_sales, row.year, row.month, row.calorie_category)

SELECT * FROM `youtubelist-256522.products_feature_store.features` WHERE id_product = '206050084' ORDER BY year_month DESC LIMIT 10
Mexico Regular 206050084 4638400.479309 2020 8 Regular
Mexico Regular 206050084 4628353.148585998 2020 7 Regular
Mexico Regular 206050084 5712945.703754998 2020 6 Regular
Mexico Regular 206050084 4664998.5243599955 2020 5 Regular
Mexico Regular 206050084 4415791.024505978 2020 4 Regular
Mexico Regular 206050084 5682114.538013995 2020 3 Regular
Mexico Regular 206050084 4308507.781618011 2020 2 Regular
Mexico Regular 206050084 3585574.937336004 2020 1 Regular
Mexico Regular 206050084 7455721.787800002 2019 12 Regular
Mexico Regular 206050084 4604966.3160190005 2019 11 Regular


In [325]:
"SELECT * FROM youtubelist-256522.products_feature_store.features WHERE id_product = {} LIMIT 100".format("206050084")

'SELECT * FROM youtubelist-256522.products_feature_store.features WHERE id_product = 206050084 LIMIT 100'

In [340]:

print("date and time:",date_time)

date and time: 04122021052203


In [341]:
80//10

8

In [342]:
"""This is my goal {}""".format(10//10)

'This is my goal 1'