In [0]:
%fs ls dbfs:/public/retail_db/orders

path,name,size,modificationTime
dbfs:/public/retail_db/orders/part-00000,part-00000,2999944,1695484798481
dbfs:/public/retail_db/orders/part-r-00000-990f5773-9005-49ba-b670-631286032674,part-r-00000-990f5773-9005-49ba-b670-631286032674,7477339,1695515039606


In [0]:
%sql
select * from text.`dbfs:/public/retail_db/schemas.json`

value
{
"""departments"": ["
{
"""column_name"": ""department_id"","
"""data_type"": ""integer"","
"""column_position"": 1"
"},"
{
"""column_name"": ""department_name"","
"""data_type"": ""string"","


In [0]:
schemas_text = spark.read.text('dbfs:/public/retail_db/schemas.json', wholetext=True).first().value

In [0]:
import json

In [0]:
json.loads(schemas_text).keys()

dict_keys(['departments', 'categories', 'orders', 'products', 'customers', 'order_items'])

In [0]:
column_details = json.loads(schemas_text)['orders']

In [0]:
sorted(column_details, key=lambda col: col['column_position'])

[{'column_name': 'order_id', 'data_type': 'integer', 'column_position': 1},
 {'column_name': 'order_date', 'data_type': 'string', 'column_position': 2},
 {'column_name': 'order_customer_id',
  'data_type': 'timestamp',
  'column_position': 3},
 {'column_name': 'order_status', 'data_type': 'string', 'column_position': 4}]

In [0]:
column_names = [col['column_name'] for col in sorted(column_details, key=lambda col: col['column_position'])]

In [0]:
%sql

select * from csv.`dbfs:/public/retail_db/orders`

_c0,_c1,_c2,_c3
"{""order_id"":1","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":11599","""order_status"":""CLOSED""}"
"{""order_id"":2","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":256","""order_status"":""PENDING_PAYMENT""}"
"{""order_id"":3","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":12111","""order_status"":""COMPLETE""}"
"{""order_id"":4","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":8827","""order_status"":""CLOSED""}"
"{""order_id"":5","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":11318","""order_status"":""COMPLETE""}"
"{""order_id"":6","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":7130","""order_status"":""COMPLETE""}"
"{""order_id"":7","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":4530","""order_status"":""COMPLETE""}"
"{""order_id"":8","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":2911","""order_status"":""PROCESSING""}"
"{""order_id"":9","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":5657","""order_status"":""PENDING_PAYMENT""}"
"{""order_id"":10","""order_date"":""2013-07-25 00:00:00.0""","""order_customer_id"":5648","""order_status"":""PENDING_PAYMENT""}"


In [0]:
df = spark.read.csv('dbfs:/public/retail_db/orders', inferSchema=True).toDF(*column_names)

In [0]:
df.show()

+--------------+--------------------+--------------------+--------------------+
|      order_id|          order_date|   order_customer_id|        order_status|
+--------------+--------------------+--------------------+--------------------+
| {"order_id":1|"order_date":"201...|"order_customer_i...|"order_status":"C...|
| {"order_id":2|"order_date":"201...|"order_customer_i...|"order_status":"P...|
| {"order_id":3|"order_date":"201...|"order_customer_i...|"order_status":"C...|
| {"order_id":4|"order_date":"201...|"order_customer_i...|"order_status":"C...|
| {"order_id":5|"order_date":"201...|"order_customer_i...|"order_status":"C...|
| {"order_id":6|"order_date":"201...|"order_customer_i...|"order_status":"C...|
| {"order_id":7|"order_date":"201...|"order_customer_i...|"order_status":"C...|
| {"order_id":8|"order_date":"201...|"order_customer_i...|"order_status":"P...|
| {"order_id":9|"order_date":"201...|"order_customer_i...|"order_status":"P...|
|{"order_id":10|"order_date":"201...|"or

In [0]:
df.schema

StructType([StructField('order_id', StringType(), True), StructField('order_date', StringType(), True), StructField('order_customer_id', StringType(), True), StructField('order_status', StringType(), True)])

In [0]:
from pyspark.sql.functions import count, col

In [0]:
df.groupBy('order_status').agg(count('*').alias('order_count')).orderBy(col('order_count')).show()

+--------------------+-----------+
|        order_status|order_count|
+--------------------+-----------+
|"order_status":"P...|        729|
|      PAYMENT_REVIEW|        729|
|"order_status":"C...|       1428|
|            CANCELED|       1428|
|"order_status":"S...|       1558|
|     SUSPECTED_FRAUD|       1558|
|"order_status":"O...|       3798|
|             ON_HOLD|       3798|
|"order_status":"C...|       7556|
|              CLOSED|       7556|
|"order_status":"P...|       7610|
|             PENDING|       7610|
|"order_status":"P...|       8275|
|          PROCESSING|       8275|
|"order_status":"P...|      15030|
|     PENDING_PAYMENT|      15030|
|"order_status":"C...|      22899|
|            COMPLETE|      22899|
+--------------------+-----------+



In [0]:
def get_columns(schemas_file, ds_name):
    schemas_text = spark.read.text(schemas_file, wholetext=True).first().value
    schemas = json.loads(schemas_text)
    column_details = schemas[ds_name]
    columns = [col['column_name'] for col in sorted(column_details, key=lambda col: col['column_position'])]
    return columns

In [0]:
columns = get_columns('dbfs:/public/retail_db/schemas.json', 'orders')

In [0]:
ds_list = [
    'departments',
    'categories',
    'products',
    'customers',
    'orders',
    'order_items'
]

In [0]:
for ds in ds_list:
    columns = get_columns('dbfs:/public/retail_db/schemas.json', ds)
    print(columns)

['department_id', 'department_name']
['category_id', 'category_department_id', 'category_name']
['product_id', 'product_cateogry_id', 'product_name', 'product_description', 'product_price', 'product_image']
['customer_id', 'customer_fname', 'customer_lname', 'customer_email', 'customer_password', 'customer_street', 'customer_city', 'customer_state', 'customer_zipcode']
['order_id', 'order_date', 'order_customer_id', 'order_status']
['order_item_id', 'order_item_order_id', 'order_item_product_id', 'order_item_quantity', 'order_item_subtotal', 'order_item_product_price']


In [0]:
for ds in ds_list:
    columns = get_columns('dbfs:/public/retail_db/schemas.json', ds)
    df = spark.read.csv(f'dbfs:/public/retail_db/{ds}', inferSchema=True).toDF(*columns)
    df.write.mode('overwrite').parquet(f'dbfs:/public/retail_db/{ds}')