In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp, hour, minute, year, month, day
from pyspark.sql.types import StringType, DoubleType, IntegerType, DateType, TimestampType

In [6]:
spark = SparkSession.builder.master("local[*]") \
    .config("spark.executor.instances", "5") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

In [7]:
df = spark.read.option("Header", True).csv("spark-warehouse/landing/supermarket_sales.csv")

In [9]:

df.columns


['Invoice ID',
 'Branch',
 'City',
 'Customer type',
 'Gender',
 'Product line',
 'Unit price',
 'Quantity',
 'Tax 5%',
 'Total',
 'Date',
 'Time',
 'Payment',
 'cogs',
 'gross margin percentage',
 'gross income',
 'Rating']

In [11]:

df.printSchema()

root
 |-- Invoice ID: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Customer type: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- Unit price: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Tax 5%: string (nullable = true)
 |-- Total: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Payment: string (nullable = true)
 |-- cogs: string (nullable = true)
 |-- gross margin percentage: string (nullable = true)
 |-- gross income: string (nullable = true)
 |-- Rating: string (nullable = true)



In [12]:
df.show(1)

+-----------+------+------+-------------+------+-----------------+----------+--------+-------+--------+--------+-----+-------+------+-----------------------+------------+------+
| Invoice ID|Branch|  City|Customer type|Gender|     Product line|Unit price|Quantity| Tax 5%|   Total|    Date| Time|Payment|  cogs|gross margin percentage|gross income|Rating|
+-----------+------+------+-------------+------+-----------------+----------+--------+-------+--------+--------+-----+-------+------+-----------------------+------------+------+
|750-67-8428|     A|Yangon|       Member|Female|Health and beauty|     74.69|       7|26.1415|548.9715|1/5/2019|13:08|Ewallet|522.83|            4.761904762|     26.1415|   9.1|
+-----------+------+------+-------------+------+-----------------+----------+--------+-------+--------+--------+-----+-------+------+-----------------------+------------+------+
only showing top 1 row



In [None]:

column_rename_and_cast = {
    'Invoice ID': ('invoice_id', StringType()),
    'Branch': ('branch', StringType()),
    'City': ('city', StringType()),
    'Customer type': ('customer_type', StringType()),
    'Gender': ('gender', StringType()),
    'Product line': ('product_line', StringType()),
    'Unit price': ('unit_price', DoubleType()),
    'Quantity': ('quantity', IntegerType()),
    'Tax 5%': ('tax_5_percent', DoubleType()),
    'Total': ('total', DoubleType()),
    'Date': ('date', StringType()), # Tratar data direto pode levar a problemas, melhor usar funções auxiliares
    'Time': ('time', StringType()), # Tratar timetamp direto pode levar a problemas, melhor usar funções auxiliares
    'Payment': ('payment', StringType()),
    'cogs': ('cogs', DoubleType()),
    'gross margin percentage': ('gross_margin_percentage', DoubleType()),
    'gross income': ('gross_income', DoubleType()),
    'Rating': ('rating', DoubleType())
}


In [13]:
column_rename_and_cast = {
    'Invoice ID': ('invoice_id', StringType()),
    'Branch': ('branch', StringType()),
    'City': ('city', StringType()),
    'Customer type': ('customer_type', StringType()),
    'Gender': ('gender', StringType()),
    'Product line': ('product_line', StringType()),
    'Unit price': ('unit_price', DoubleType()),
    'Quantity': ('quantity', IntegerType()),
    'Tax 5%': ('tax_5_percent', DoubleType()),
    'Total': ('total', DoubleType()),
    'Date': ('date', StringType()), # Tratar data direto pode levar a problemas, melhor usar funções auxiliares
    'Time': ('time', StringType()), # Tratar timetamp direto pode levar a problemas, melhor usar funções auxiliares
    'Payment': ('payment', StringType()),
    'cogs': ('cogs', DoubleType()),
    'gross margin percentage': ('gross_margin_percentage', DoubleType()),
    'gross income': ('gross_income', DoubleType()),
    'Rating': ('rating', DoubleType())
}

In [14]:

for original_name, (new_name, new_type) in column_rename_and_cast.items():
    df = df.withColumnRenamed(original_name, new_name) \
           .withColumn(new_name, col(new_name).cast(new_type))

In [15]:
df.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- branch: string (nullable = true)
 |-- city: string (nullable = true)
 |-- customer_type: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- product_line: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- tax_5_percent: double (nullable = true)
 |-- total: double (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- payment: string (nullable = true)
 |-- cogs: double (nullable = true)
 |-- gross_margin_percentage: double (nullable = true)
 |-- gross_income: double (nullable = true)
 |-- rating: double (nullable = true)



In [16]:

df.explain()

== Physical Plan ==
*(1) Project [Invoice ID#1174 AS invoice_id#1313, Branch#1175 AS branch#1349, City#1176 AS city#1385, Customer type#1177 AS customer_type#1421, Gender#1178 AS gender#1457, Product line#1179 AS product_line#1493, cast(Unit price#1180 as double) AS unit_price#1529, cast(Quantity#1181 as int) AS quantity#1565, cast(Tax 5%#1182 as double) AS tax_5_percent#1601, cast(Total#1183 as double) AS total#1637, Date#1184 AS date#1673, Time#1185 AS time#1709, Payment#1186 AS payment#1745, cast(cogs#1187 as double) AS cogs#1781, cast(gross margin percentage#1188 as double) AS gross_margin_percentage#1817, cast(gross income#1189 as double) AS gross_income#1853, cast(Rating#1190 as double) AS rating#1889]
+- FileScan csv [Invoice ID#1174,Branch#1175,City#1176,Customer type#1177,Gender#1178,Product line#1179,Unit price#1180,Quantity#1181,Tax 5%#1182,Total#1183,Date#1184,Time#1185,Payment#1186,cogs#1187,gross margin percentage#1188,gross income#1189,Rating#1190] Batched: false, DataFi

In [17]:
df.show()


+-----------+------+---------+-------------+------+--------------------+----------+--------+-------------+--------+---------+-----+-----------+------+-----------------------+------------+------+
| invoice_id|branch|     city|customer_type|gender|        product_line|unit_price|quantity|tax_5_percent|   total|     date| time|    payment|  cogs|gross_margin_percentage|gross_income|rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------------+--------+---------+-----+-----------+------+-----------------------+------------+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|      26.1415|548.9715| 1/5/2019|13:08|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|         3.82|   80.22| 3/8/2019|10:29|       Cash|  76.4|            4.761904762|        3.82|   9.6|
|631-41-3108|     A|   Ya

In [18]:
df = df.withColumn("year", year(to_date(col("date"), 'MM/dd/yyyy'))) \
        .withColumn("month", month(to_date(col("date"), 'MM/dd/yyyy'))) \
        .withColumn("day", day(to_date(col("date"), 'MM/dd/yyyy'))) \
        .withColumn("hour", hour(to_timestamp(col("time"), 'HH:mm'))) \
        .withColumn("minute", minute(to_timestamp(col("time"), 'HH:mm'))) \
        .drop(col("date")) \
        .drop(col("time"))

In [19]:
df.printSchema()

root
 |-- invoice_id: string (nullable = true)
 |-- branch: string (nullable = true)
 |-- city: string (nullable = true)
 |-- customer_type: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- product_line: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- tax_5_percent: double (nullable = true)
 |-- total: double (nullable = true)
 |-- payment: string (nullable = true)
 |-- cogs: double (nullable = true)
 |-- gross_margin_percentage: double (nullable = true)
 |-- gross_income: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



In [20]:
df.show()

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------------+--------+-----------+------+-----------------------+------------+------+----+-----+---+----+------+
| invoice_id|branch|     city|customer_type|gender|        product_line|unit_price|quantity|tax_5_percent|   total|    payment|  cogs|gross_margin_percentage|gross_income|rating|year|month|day|hour|minute|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------------+--------+-----------+------+-----------------------+------------+------+----+-----+---+----+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|      26.1415|548.9715|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|2019|    1|  5|  13|     8|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|         3.82|   80.22|       Cash|  76.4|            4.761904762|        3.82|   9.

In [None]:
df.write.saveAsTable("supermarket_sales_bronze", format="parquet", mode="overwrite", partitionBy=["year", "month", "day", "hour", "minute"], path="bronze/supermarket_sales")