In [2]:
#Start session Spark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder \
  .appName('clean_products_magdiel') \
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
  .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [3]:
##########################################################
#######Extract table products from BigQuery Staging ######
##########################################################

In [4]:
#name table products
table_products = "becade_mgutierrez.stg_products"

#load table to dataframe
stg_products = spark.read \
  .format("bigquery") \
  .option("table", table_products) \
  .load()

#show incoming lines
print("lines incoming: " , stg_products.count())

#show schema
stg_products.printSchema()

lines incoming:  5352
root
 |-- _airbyte_ab_id: string (nullable = true)
 |-- _airbyte_emitted_at: long (nullable = true)
 |-- app_sale_price: string (nullable = true)
 |-- app_sale_price_currency: string (nullable = true)
 |-- country: string (nullable = true)
 |-- evaluate_rate: string (nullable = true)
 |-- isbestseller: boolean (nullable = true)
 |-- isprime: boolean (nullable = true)
 |-- original_price: string (nullable = true)
 |-- product_detail_url: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_main_image_url: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- rowid: long (nullable = true)



In [5]:
from pyspark.sql.functions import col, when,regexp_extract , translate ,regexp_replace ,substring,concat,max, last
from pyspark.sql.types import IntegerType,DecimalType,DateType,StringType, DoubleType

In [6]:
#select columns from table
raw_products = stg_products.select('product_id','country','app_sale_price','evaluate_rate','isbestseller','isprime','app_sale_price_currency')

In [7]:
#clean column app_sale_price drop values 'None'
raw_products = raw_products.where(raw_products.app_sale_price != 'None')

#show outgoing lines
print("lines clean outgoing: " , raw_products.count())

lines clean outgoing:  4905


In [10]:
# fill empty rows evaluate_rate
df_raw_products= raw_products.withColumn("evaluate_rate", when(col("evaluate_rate")=="" ,None)  \
                               .otherwise(col("evaluate_rate"))) 
        
#clean column app_sale_price drop values 'None'
df_raw_products = df_raw_products.where(df_raw_products.evaluate_rate != "None")

#show outgoing lines
print("lines clean outgoing: " , df_raw_products.count())

lines clean outgoing:  4628


In [11]:
#drop duplicates rows products
df_raw_products = df_raw_products.dropDuplicates()

#show outgoing lines
print("lines clean outgoing: " , df_raw_products.count())

lines clean outgoing:  3883


In [12]:
#clean column evaluate_rate extract format {n.n} &&  replace characters {,} by {.}
df_clean_rate = df_raw_products \
                .withColumn('clean_rate', regexp_extract(col('evaluate_rate'), r'([0-9][\.\,][0-9])',1)) \
                .withColumn('clean_rate', translate(col('clean_rate'), ',', '.'))

In [13]:
#clean column app_sale_price delete characters && define format {n nnn.nn}
df_raw_price = df_clean_rate \
                .withColumn('app_raw_price', translate(col('app_sale_price'), ',￥', '.')) \
                .withColumn('decimal_price', regexp_extract(col('app_raw_price'), r'([\.][0-9]{2}+$)',1)) \
                .withColumn('raw_number_price', regexp_extract(col('app_raw_price'), r'([0-9][\.][0-9]{3}|[0-9]{2,3})',1)) \
                .withColumn('number_price', translate(col('raw_number_price'), '.', ''))
              
#Show row products             
df_raw_price.select('product_id','country','app_sale_price','app_raw_price','raw_number_price','number_price','decimal_price').show(5,truncate=False)

+----------+-------+--------------+-------------+----------------+------------+-------------+
|product_id|country|app_sale_price|app_raw_price|raw_number_price|number_price|decimal_price|
+----------+-------+--------------+-------------+----------------+------------+-------------+
|B07FXP7HVS|IT     |18,19         |18.19        |18              |18          |.19          |
|B077T5RQF7|IT     |50,48         |50.48        |50              |50          |.48          |
|B074VMTP68|DE     |29,99         |29.99        |29              |29          |.99          |
|B00QHC01C2|NL     |29,72         |29.72        |29              |29          |.72          |
|B01GFJWHZ0|NL     |21,43         |21.43        |21              |21          |.43          |
+----------+-------+--------------+-------------+----------------+------------+-------------+
only showing top 5 rows



In [14]:
#concat columns  number_price + decimal_price = app_sale_price_us
df_clean_products_raw=df_raw_price.select('product_id','country','isbestseller','isprime','app_sale_price_currency','clean_rate',
                                          concat(df_raw_price.number_price,df_raw_price.decimal_price).alias("app_sale_price"))

#Show row products
df_clean_products_raw.show(5)

+----------+-------+------------+-------+-----------------------+----------+--------------+
|product_id|country|isbestseller|isprime|app_sale_price_currency|clean_rate|app_sale_price|
+----------+-------+------------+-------+-----------------------+----------+--------------+
|B07FXP7HVS|     IT|        true|  false|                      €|       4.1|         18.19|
|B077T5RQF7|     IT|        true|   true|                      €|       4.4|         50.48|
|B074VMTP68|     DE|        true|   true|                      €|       4.4|         29.99|
|B00QHC01C2|     NL|       false|   true|                      €|       4.5|         29.72|
|B01GFJWHZ0|     NL|        true|   true|                      €|       4.5|         21.43|
+----------+-------+------------+-------+-----------------------+----------+--------------+
only showing top 5 rows



In [15]:
##########################################################
##extract table exhcange_rate from BigQuery Staging ######
############## PRODUCTS ALL COUNTRY CONTINUE #############

In [16]:
#name table exchange
table_exchange = "becade_mgutierrez.stg_tasas_cambio_pais_anual"

#load table
stg_exchange = spark.read \
  .format("bigquery") \
  .option("table", table_exchange) \
  .load()

#show schema
stg_exchange.printSchema()

#show incoming lines
print("lines incoming: " , stg_exchange.count())

root
 |-- _airbyte_ab_id: string (nullable = true)
 |-- _airbyte_emitted_at: long (nullable = true)
 |-- Alpha_2_code: string (nullable = true)
 |-- Alpha_3_code: string (nullable = true)
 |-- Country_name: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- currency: string (nullable = true)
 |-- rowid: long (nullable = true)
 |-- value: double (nullable = true)

lines incoming:  3606


In [17]:
#select columns from table
raw_exchange = stg_exchange.select('Alpha_2_code','Alpha_3_code','Country_name','Year','currency','value')

In [18]:
#rename columns
raw_exchange = raw_exchange.withColumnRenamed('Alpha_2_code','country_code') \
                           .withColumnRenamed('Alpha_3_code','country_code_iso') \
                           .withColumnRenamed('Country_name','country_name') \
                           .withColumnRenamed('Year','year_rate') \
                           .withColumnRenamed('currency','currency_name') \
                           .withColumnRenamed('value','value_rate')

#Show row exchange
raw_exchange.show(2)

+------------+----------------+------------+---------+-------------+----------+
|country_code|country_code_iso|country_name|year_rate|currency_name|value_rate|
+------------+----------------+------------+---------+-------------+----------+
|          AR|             ARG|   Argentina|     1962|          ARS|       0.0|
|          AR|             ARG|   Argentina|     1963|          ARS|       0.0|
+------------+----------------+------------+---------+-------------+----------+
only showing top 2 rows



In [19]:
#group by and select last value_rate 
df_group_rate = raw_exchange.select('country_code','year_rate','value_rate') \
        .groupBy('country_code',) \
        .agg(max('year_rate').alias('max_year'),last('value_rate').alias('value_exchange')) \
        .orderBy('country_code',asceding=False)
     
#show outgoing lines
print("lines clean outgoing: " , df_group_rate.count())

#Show row exchange
df_group_rate.show(5)

lines clean outgoing:  58
+------------+--------+--------------+
|country_code|max_year|value_exchange|
+------------+--------+--------------+
|          AR|    2020|      70.53917|
|          AT|    2020|      0.875506|
|          AU|    2020|      1.453085|
|          BE|    2020|      0.875506|
|          BG|    2020|      1.716333|
+------------+--------+--------------+
only showing top 5 rows



In [20]:
#join dataframe df_clean_products_raw && df_exchange_group
df_merge_rows = df_group_rate.alias('rate') \
                .join(df_clean_products_raw.alias('price'), col('price.country') == col('rate.country_code'), "inner")

#Show first 20 rows
df_merge_rows = df_merge_rows.select('product_id','isbestseller','isprime','app_sale_price_currency','clean_rate','app_sale_price','country_code','value_exchange') 

#show outgoing lines
print("lines clean outgoing: " , df_merge_rows.count())

#Show row mergedf_merge_rows
df_merge_rows.show(5)

lines clean outgoing:  3469
+----------+------------+-------+-----------------------+----------+--------------+------------+--------------+
|product_id|isbestseller|isprime|app_sale_price_currency|clean_rate|app_sale_price|country_code|value_exchange|
+----------+------------+-------+-----------------------+----------+--------------+------------+--------------+
|B07FXP7HVS|        true|  false|                      €|       4.1|         18.19|          IT|      0.875506|
|B077T5RQF7|        true|   true|                      €|       4.4|         50.48|          IT|      0.875506|
|B074VMTP68|        true|   true|                      €|       4.4|         29.99|          DE|      0.875506|
|B00QHC01C2|       false|   true|                      €|       4.5|         29.72|          NL|      0.875506|
|B01GFJWHZ0|        true|   true|                      €|       4.5|         21.43|          NL|      0.875506|
+----------+------------+-------+-----------------------+----------+--------

In [21]:
##equivalente en dólares del precio de cada uno de los productos
df_raw_products=df_merge_rows.withColumn('app_sale_price_us', col('app_sale_price')/col('value_exchange'))

#Show first 2 rows
df_raw_products.show(n=2, truncate=False)

#Display Schema
df_raw_products.printSchema()

+----------+------------+-------+-----------------------+----------+--------------+------------+--------------+------------------+
|product_id|isbestseller|isprime|app_sale_price_currency|clean_rate|app_sale_price|country_code|value_exchange|app_sale_price_us |
+----------+------------+-------+-----------------------+----------+--------------+------------+--------------+------------------+
|B07FXP7HVS|true        |false  |€                      |4.1       |18.19         |IT          |0.875506      |20.776556642672926|
|B077T5RQF7|true        |true   |€                      |4.4       |50.48         |IT          |0.875506      |57.65808572414124 |
+----------+------------+-------+-----------------------+----------+--------------+------------+--------------+------------------+
only showing top 2 rows

root
 |-- product_id: string (nullable = true)
 |-- isbestseller: boolean (nullable = true)
 |-- isprime: boolean (nullable = true)
 |-- app_sale_price_currency: string (nullable = true)
 |

In [22]:
#renamed columns 
df_full_products = df_raw_products.withColumnRenamed('isprime','product_is_prime') \
                           .withColumnRenamed('app_sale_price_currency','product_price_currency') \
                           .withColumnRenamed('isbestseller','product_is_bestseller') \
                           .withColumnRenamed('clean_rate','product_rate') \
                           .withColumnRenamed('app_sale_price','product_price') \
                           .withColumnRenamed('country_code','product_country') \
                           .withColumnRenamed('app_sale_price_us','product_price_us')

#Show row exchange
df_full_products.show(2)

+----------+---------------------+----------------+----------------------+------------+-------------+---------------+--------------+------------------+
|product_id|product_is_bestseller|product_is_prime|product_price_currency|product_rate|product_price|product_country|value_exchange|  product_price_us|
+----------+---------------------+----------------+----------------------+------------+-------------+---------------+--------------+------------------+
|B07FXP7HVS|                 true|           false|                     €|         4.1|        18.19|             IT|      0.875506|20.776556642672926|
|B077T5RQF7|                 true|            true|                     €|         4.4|        50.48|             IT|      0.875506| 57.65808572414124|
+----------+---------------------+----------------+----------------------+------------+-------------+---------------+--------------+------------------+
only showing top 2 rows



In [23]:
#drop columns value_exchange
df_full_products= df_full_products.drop('value_exchange')

In [24]:
df_full_products = df_full_products.withColumn("product_price",df_full_products.product_price.cast(DoubleType()))  \
                                    .withColumn("product_rate",df_full_products.product_rate.cast(DoubleType())) \
                                    .withColumn("product_is_bestseller",df_full_products.product_is_bestseller.cast(StringType())) \
                                    .withColumn("product_is_prime",df_full_products.product_is_prime.cast(StringType())) 
#Display Schema
df_full_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_is_bestseller: string (nullable = true)
 |-- product_is_prime: string (nullable = true)
 |-- product_price_currency: string (nullable = true)
 |-- product_rate: double (nullable = true)
 |-- product_price: double (nullable = true)
 |-- product_country: string (nullable = true)
 |-- product_price_us: double (nullable = true)



In [25]:
df_full_products.show(5)

+----------+---------------------+----------------+----------------------+------------+-------------+---------------+------------------+
|product_id|product_is_bestseller|product_is_prime|product_price_currency|product_rate|product_price|product_country|  product_price_us|
+----------+---------------------+----------------+----------------------+------------+-------------+---------------+------------------+
|B07FXP7HVS|                 true|           false|                     €|         4.1|        18.19|             IT|20.776556642672926|
|B077T5RQF7|                 true|            true|                     €|         4.4|        50.48|             IT| 57.65808572414124|
|B074VMTP68|                 true|            true|                     €|         4.4|        29.99|             DE| 34.25447683967899|
|B00QHC01C2|                false|            true|                     €|         4.5|        29.72|             NL|33.946083750425466|
|B01GFJWHZ0|                 true|       

In [26]:
#####################################################################
########insert table pr_products to BigQuery Production #############
####################Products price US ###############################

In [27]:
df_full_products.write \
  .format("bigquery") \
  .option("table","becade_mgutierrez.pr_products_standard_price") \
  .option("temporaryGcsBucket", "amazon_magdielgutierrez") \
  .mode('overwrite') \
  .save()