##Read CSV files into Dataframe

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = (SparkSession.builder.appName("dataPrep").getOrCreate())

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
schema = StructType([
  StructField("Order ID", StringType(), True),
  StructField("Product", StringType(), True),
  StructField("Quantity Order", StringType(), True),
  StructField("Price Each", StringType(), True),
  StructField("Order Date", StringType(), True),
  StructField("Purchase Address", StringType(), True)
])

In [0]:
sales_data_fpath = "dbfs:/FileStore/salesdata/raw"
sales_raw_df = (spark.read.format("csv")
                 .option("header", True)
                 .schema(schema)
                 .load(sales_data_fpath))

In [0]:
sales_raw_df.show(5)

In [0]:
sales_raw_df.printSchema()

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS sales_db")

In [0]:
spark.sql("use sales_db")

In [0]:
# spark.sql("drop table sales_raw")
spark.sql(""" create table if not exists sales_raw(
      OrderId string,
      Product string,
      QuantityOrdered string,
      PriceEach string,
      OrderDate string,
      PurchaseAddress string
)""")

##Insert dataFrame data to Sales raw table

In [0]:
sales_raw_df.createOrReplaceTempView("temp_sales_raw")

In [0]:
spark.sql("select * from temp_sales_raw").show(5, False)

In [0]:
spark.sql("describe temp_sales_raw").show()

In [0]:
spark.sql("describe sales_db.sales_raw").show()

In [0]:
spark.sql("""INSERT overwrite table sales_db.sales_raw
      SELECT `Order ID`, `Product`, `Quantity Order`, `Price Each`, `Order Date`, `Purchase Address`
      FROM temp_sales_raw""")

In [0]:
spark.sql("SELECT * FROM sales_db.sales_raw").show(5)

In [0]:
temp_sales_df = spark.sql("""select cast(OrderID as int) as OrderId,
                                Product,
                                QuantityOrdered,
                                PriceEach,
                                to_timestamp(OrderDate, 'MM/dd/yy HH:mm') as OrderDate,
                                PurchaseAddress,
                                split(purchaseaddress, ",")[1] as city,
                                split(split(purchaseaddress, ",")[2], " ")[1] as state,
                                year(to_timestamp(OrderDate, 'MM/dd/yy HH:mm')) as ReportYear,
                                month(to_timestamp(OrderDate, 'MM/dd/yy HH:mm')) as Month
                             from sales_raw
                             where orderid is not null
                               and orderid != "Order ID"

""")

In [0]:
temp_sales_df.show(5)

In [0]:
temp_sales_df.createOrReplaceTempView("temp_sales")

In [0]:
spark.sql("describe temp_sales").show()

In [0]:
spark.sql("""
      create table if not exists sales(
      OrderID int,
      Product string,
      QuantityOrdered int,
      PriceEach int,
      OrderDate timestamp,
      PurchaseAddress string,
      city string,
      state string,
      ReportYear int,
      Month int
    )
    using Parquet
    partitioned by (ReportYear, Month)
    options('compression' = 'snappy')
    location 'dbfs:/FileStore/salesdata/published'
    """)

In [0]:
#spark.sql("drop table sales")
spark.sql("describe sales").show()

In [0]:
spark.sql("""
    insert into sales
    select
      OrderID,
      Product,
      cast(QuantityOrdered as int),
      cast(PriceEach as float),
      OrderDate,
      PurchaseAddress,
      city,
      state,
      ReportYear,
      Month
    from temp_sales
""")

In [0]:
%sql
select * from sales limit 5

OrderID,Product,QuantityOrdered,PriceEach,OrderDate,PurchaseAddress,city,state,ReportYear,Month
141234,iPhone,1,700,2019-01-22T21:25:00.000+0000,"944 Walnut St, Boston, MA 02215",Boston,MA,2019,1
141235,Lightning Charging Cable,1,14,2019-01-28T14:15:00.000+0000,"185 Maple St, Portland, OR 97035",Portland,OR,2019,1
141236,Wired Headphones,2,11,2019-01-17T13:33:00.000+0000,"538 Adams St, San Francisco, CA 94016",San Francisco,CA,2019,1
141237,27in FHD Monitor,1,149,2019-01-05T20:33:00.000+0000,"738 10th St, Los Angeles, CA 90001",Los Angeles,CA,2019,1
141238,Wired Headphones,1,11,2019-01-25T11:59:00.000+0000,"387 10th St, Austin, TX 73301",Austin,TX,2019,1
