# Creando Sesión

Importamos módulos de apache spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.getOrCreate()

# Poblando Capa Analytics

1° PASO Definimos ruta del archivo de la capa previa

## Poblando 'Products'

In [3]:
ruta_products_staging = "gs://curso-bigdata/datalake/staging/products/"

In [4]:
df_products = spark.read.format("parquet").option("header","true").load(ruta_products_staging)

                                                                                

## Poblando 'Customers'

In [5]:
ruta_customers_staging = "gs://curso-bigdata/datalake/staging/customers/"

In [6]:
df_customers = spark.read.format("parquet").option("header","true").load(ruta_customers_staging)

                                                                                

## Poblando 'Orders'

In [7]:
ruta_orders_staging = "gs://curso-bigdata/datalake/staging/orders/"

In [8]:
df_orders = spark.read.format("parquet").option("header","true").load(ruta_orders_staging)

## Integrando información

Creación de columna id, similar a una 'primary key' en una base de datos

In [9]:
df_products = df_products.withColumn("id_p", monotonically_increasing_id())

In [10]:
df_customers = df_customers.withColumn("id_c", monotonically_increasing_id())
# Renombramiento de columna para evitar ambiguedades en las columnas al momento de unirlas
df_customers = df_customers.withColumnRenamed("customer_id","customer_id_x")

In [11]:
df_orders = df_orders.withColumn("id_o", monotonically_increasing_id())
df_orders = df_orders.withColumnRenamed("customer_id","customer_id_y")

El dataframe resultante de la unión se guardará en cop_data

In [12]:
cust_order = df_customers.join(df_orders,df_customers.id_c == df_orders.id_o)

In [13]:
cop_data = cust_order.join(df_products, cust_order.id_o == df_products.id_p)

In [14]:
cop_data.printSchema()

root
 |-- customer_id_x: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- home_address: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- id_c: long (nullable = false)
 |-- order_id: integer (nullable = true)
 |-- customer_id_y: integer (nullable = true)
 |-- payment: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- id_o: long (nullable = false)
 |-- product_ID: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- colour: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- product_type: string (nullab

Agregando columnas que permitan un mejor manejo de la información así como la partición

In [15]:
cop_data = cop_data.withColumn("sales",cop_data.price * cop_data.quantity)

In [16]:
cop_data = cop_data.withColumn("year_order",year(cop_data.order_date))
cop_data = cop_data.withColumn("month_order",month(cop_data.order_date))
cop_data = cop_data.withColumn("day_order",dayofmonth(cop_data.order_date))

In [17]:
cop_data = cop_data.withColumn("year_delivery",year(cop_data.delivery_date))
cop_data = cop_data.withColumn("month_delivery",month(cop_data.delivery_date))
cop_data = cop_data.withColumn("day_delivery",dayofmonth(cop_data.delivery_date))

Eliminación de las columnas de id, ahora irrelevantes, solo necesarias para hacer las uniones

In [18]:
cop_data = cop_data.drop(*["id_c","id_o","id_p"])

### Guardando el dataframe resultante 'cop_data'

In [19]:
ruta_cop_data_analytics = "gs://curso-bigdata/datalake/analytics/"

cop_data.write.mode("overwrite")\
    .format("parquet")\
    .save(ruta_cop_data_analytics)

22/04/04 19:41:56 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                