In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("retail_pipeline").getOrCreate())

In [86]:
# schema-on-read method is best for adhoc queries
# dfs = spark.read.csv("pyspark/data/Amazon Sale Report.csv", header=True, inferSchema=True)
# df.printSchema()

In [None]:
# defining our schema explicitly
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, DateType
amazon_sale_report_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("Order ID", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Fulfilment", StringType(), True),
    StructField("Sales Channel", StringType(), True),0.
    StructField("Style", StringType(), True),
    StructField("SKU", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Size", StringType(), True),
    StructField("ASIN", StringType(), True),
    StructField("Courier Status", StringType(), True),
    StructField("Qty", IntegerType(), True),
    StructField("currency", StringType(), True),
    StructField("Amount", DoubleType(), True),
    StructField("Ship-city", StringType(), True),
    StructField("Ship-state", StringType(), True),
    StructField("Ship-postal-code", StringType(), True),
    StructField("Ship-country", StringType(), True),
    StructField("B2B", StringType(), True),
    StructField("Fulfilled-by", StringType(), True)
])

In [88]:
dfs = spark.read.csv("./pyspark/data/Amazon Sale Report.csv", header=True, schema=amazon_sale_report_schema)
dfs.printSchema()

root
 |-- index: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Fulfilment: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Ship-Service-level: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- SKU: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- ASIN: string (nullable = true)
 |-- Courier Status: string (nullable = true)
 |-- Qty: integer (nullable = true)
 |-- currency: string (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Ship-city: string (nullable = true)
 |-- Ship-state: string (nullable = true)
 |-- Ship-postal-code: string (nullable = true)
 |-- Ship-country: string (nullable = true)
 |-- B2B: string (nullable = true)
 |-- Fulfilled-by: string (nullable = true)



In [89]:
dfs.show(5)

+-----+-------------------+--------+--------------------+----------+-------------+------------------+-------+---------------+-------------+----+----------+--------------+---+--------+------+-----------+-----------+----------------+------------+--------------------+------------+
|index|           Order ID|    Date|              Status|Fulfilment|Sales Channel|Ship-Service-level|  Style|            SKU|     Category|Size|      ASIN|Courier Status|Qty|currency|Amount|  Ship-city| Ship-state|Ship-postal-code|Ship-country|                 B2B|Fulfilled-by|
+-----+-------------------+--------+--------------------+----------+-------------+------------------+-------+---------------+-------------+----+----------+--------------+---+--------+------+-----------+-----------+----------------+------------+--------------------+------------+
|    0|405-8078784-5731545|04-30-22|           Cancelled|  Merchant|    Amazon.in|          Standard| SET389| SET389-KR-NP-S|          Set|   S|B09KXVBD7Z|        

In [90]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/home/jovyan/spark-warehouse')]