# pySpark ETL | Loading, Joining, Writing Out Data with AWS Glue
This sample shows how to read, joins, and write out data files on S3 from Amazon SageMaker Notebook (Pyspark)

## 1. Checking the tables and schemas

### prd_feature

In [8]:
prd_feature = spark.read.parquet("s3://data-lake-bucket-imba/features/prd_feature_db")
prd_feature.show(10)
prd_feature.printSchema()
print("Count: ", prd_feature.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+-----------------+-----------+-------------+
|product_id|prod_second_orders|prod_first_orders|prod_orders|prod_reorders|
+----------+------------------+-----------------+-----------+-------------+
|     39928|              7798|            14376|      50141|        35765|
|      4421|              2366|             5529|      12230|         6701|
|     28769|               590|             1687|       3224|         1537|
|     12879|               237|              495|       1293|          798|
|     33700|                59|              360|        456|           96|
|     48015|               432|              837|       2665|         1828|
|     47144|              5343|             9893|      34583|        24690|
|     21299|                19|               75|        121|           46|
|     14148|                37|              346|        402|           56|
|     35851|               594|             1300|       3665|         2365|
+----------+

### up_features

In [11]:
up_features = spark.read.parquet("s3://data-lake-bucket-imba/features/up_features_db")
up_features.show(10)
up_features.printSchema()
print("Count: ", up_features.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+---------+-------------+--------------+------------------------+
|user_id|product_id|up_orders|up_last_order|up_first_order|up_average_cart_position|
+-------+----------+---------+-------------+--------------+------------------------+
| 167303|     47717|        1|            4|             4|                     3.0|
| 167315|     42342|        9|           17|             3|       17.77777777777778|
| 167315|     34276|        2|            6|             1|                    13.0|
| 167315|     25043|        1|            6|             6|                    15.0|
| 167320|     37646|        2|            2|             1|                    18.5|
| 167340|     24852|        6|           35|             3|       6.333333333333333|
| 167350|     42265|        3|           42|            12|       7.666666666666667|
| 167358|     11844|        2|           16|            15|                    12.5|
| 167358|     24841|       11|           31|            10|      

### user_features_1

In [12]:
user_features_1 = spark.read.parquet("s3://data-lake-bucket-imba/features/user_feature1_db")
user_features_1.show(10)
user_features_1.printSchema()
print("Count: ", user_features_1.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+-----------+--------------------------+
|user_id|user_orders|user_period|user_mean_days_since_prior|
+-------+-----------+-----------+--------------------------+
| 167495|         46|       5924|         5.294012511170688|
| 167511|         12|        775|         8.516483516483516|
| 167782|         15|       2496|          17.9568345323741|
| 167815|         16|       1885|        16.830357142857142|
| 168242|          8|        994|        10.923076923076923|
| 168508|         48|       4890|         7.761904761904762|
| 168654|          8|        887|        17.392156862745097|
| 168764|         50|       6004|         7.277575757575757|
| 168832|         31|       1718|         9.988372093023257|
| 168854|         99|       3502|         4.981507823613087|
+-------+-----------+-----------+--------------------------+
only showing top 10 rows

root
 |-- user_id: long (nullable = true)
 |-- user_orders: integer (nullable = true)
 |-- user_period: integer (nullabl

### user_features_2

In [13]:
user_features_2 = spark.read.parquet("s3://data-lake-bucket-imba/features/user_features_2_db")
user_features_2.show(10)
user_features_2.printSchema()
print("Count: ", user_features_2.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----------------------+-------------------+
|user_id|user_total_products|user_distinct_products| user_reorder_ratio|
+-------+-------------------+----------------------+-------------------+
| 144182|                146|                   103| 0.3049645390070922|
| 144242|               1486|                   313| 0.8012295081967213|
| 144749|                 66|                    61|0.12195121951219512|
| 145000|                910|                   218| 0.7899543378995434|
| 145751|                155|                    69|  0.581081081081081|
| 145773|                370|                    99| 0.7465564738292011|
| 143035|                 51|                    28| 0.5348837209302325|
| 143598|                112|                    81|0.34444444444444444|
|  78081|                720|                   194| 0.7387640449438202|
|  78800|                518|                   196| 0.6264591439688716|
+-------+-------------------+----------------------

## 2. Join Tables 

* First, join `up_features` and `prd_feature` on "product_id"
* Next, join the result with `user_features_1` on "user_id"
* Then, join the result with `user_features_2` on "user_id"

In [44]:
joinDF = ((up_features.join(prd_feature, "product_id")).join(user_features_1, "user_id")).join(user_features_2, "user_id")
joinDF.show(10)
joinDF.printSchema()
print("Count: ", joinDF.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+---------+-------------+--------------+------------------------+------------------+-----------------+-----------+-------------+-----------+-----------+--------------------------+-------------------+----------------------+-------------------+
|user_id|product_id|up_orders|up_last_order|up_first_order|up_average_cart_position|prod_second_orders|prod_first_orders|prod_orders|prod_reorders|user_orders|user_period|user_mean_days_since_prior|user_total_products|user_distinct_products| user_reorder_ratio|
+-------+----------+---------+-------------+--------------+------------------------+------------------+-----------------+-----------+-------------+-----------+-----------+--------------------------+-------------------+----------------------+-------------------+
| 167303|     47717|        1|            4|             4|                     3.0|               732|             1787|       4604|         2817|          7|        297|         6.906976744186046|                

## 3.  Writing to S3 Bucket
Use `repartition()` to convert the join result into one dataframe, and write it out to S3.

In [55]:
singleDF = joinDF.repartition(1)
singleDF.write.csv("s3://data-lake-bucket-imba/features/finaltable", header = "true")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…