In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_schema = "order_id long , order_date string, customer_id long,order_status string"

In [3]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [4]:
customer_schema = "customerid long , customer_fname string , customer_lname string , user_name string,password string , address string, city string, state string, pincode long "

In [5]:
customers_df = spark.read \
.format("csv") \
.schema(customer_schema) \
.load("/public/trendytech/retail_db/customers")

In [6]:
orders_df.createOrReplaceTempView("orders")

In [7]:
customers_df.createOrReplaceTempView("customers")

In [8]:
spark.sql("select * from orders inner join customers on orders.customer_id == customers.customerid").write.format("noop").mode("overwrite").save()

In [9]:
orders_new_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/retail_db/ordersnew")

In [10]:
orders_new_df.groupBy("order_status").count().collect()

[Row(order_status='PENDING_PAYMENT', count=5636250),
 Row(order_status='COMPLETE', count=46008801),
 Row(order_status='ON_HOLD', count=1424250),
 Row(order_status='PAYMENT_REVIEW', count=273375),
 Row(order_status='PROCESSING', count=3103125),
 Row(order_status='CLOSED', count=2833500),
 Row(order_status='SUSPECTED_FRAUD', count=584250),
 Row(order_status='PENDING', count=2853750),
 Row(order_status='CANCELED', count=535500)]

In [11]:
mapping_schema = "status string, code int"

In [12]:
mapping_df = spark.read \
.format("csv") \
.option("delimiter","|") \
.schema(mapping_schema) \
.load("/public/trendytech/datasets/mapping_data")

In [13]:
mapping_df.show()

+---------------+----+
|         status|code|
+---------------+----+
|PENDING_PAYMENT|   1|
|       COMPLETE|   2|
|        ON_HOLD|   3|
| PAYMENT_REVIEW|   4|
|     PROCESSING|   5|
|         CLOSED|   6|
|SUSPECTED_FRAUD|   7|
|        PENDING|   8|
|       CANCELED|   9|
+---------------+----+



In [14]:
orders_new_df.show()

+--------+--------------------+-----------+------------+
|order_id|          order_date|customer_id|order_status|
+--------+--------------------+-----------+------------+
|    2480|2013-08-07 00:00:...|       3807|    COMPLETE|
|   30479|2014-01-30 00:00:...|       9265|    COMPLETE|
|    2481|2013-08-07 00:00:...|       2476|    COMPLETE|
|   30481|2014-01-30 00:00:...|       9240|    COMPLETE|
|    2483|2013-08-07 00:00:...|      10453|    COMPLETE|
|   30484|2014-01-30 00:00:...|       2876|    COMPLETE|
|    2484|2013-08-07 00:00:...|       9256|    COMPLETE|
|   30485|2014-01-30 00:00:...|       1069|    COMPLETE|
|    2488|2013-08-07 00:00:...|       1255|    COMPLETE|
|   30486|2014-01-30 00:00:...|       1151|    COMPLETE|
|    2491|2013-08-07 00:00:...|        247|    COMPLETE|
|   30487|2014-01-30 00:00:...|       6772|    COMPLETE|
|    2495|2013-08-07 00:00:...|       9011|    COMPLETE|
|   30489|2014-01-30 00:00:...|       5717|    COMPLETE|
|    2498|2013-08-07 00:00:...|

In [15]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold','-1')

In [None]:
orders_new_df.join(mapping_df, orders_new_df.order_status == mapping_df.status, "inner").write.format("noop").mode("overwrite").save()