In [18]:
# ETL stands for 

# Extract: extract the data from the different sources

# Transform: Transform the unstructured data into structured data. Transformations like cleaning, manipulation, etc.

# Load : Load the transformed data into a location or date warehouse.


In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,concat,lit,floor,rand
spark = SparkSession.builder.appName("ETLPractice").getOrCreate()
source_path = "orders.csv"
target_path = "order_result.csv"
load_data = spark.read.csv("orders.csv",header = True, inferSchema = True) 


In [20]:
load_data.columns
load_data.show(5)


+-------+----------+----------+----------+-----------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|
+-------+----------+----------+----------+-----------+
|      1|      john|       doe|         5|     active|
|      2|      jane|     smith|         8|     active|
|      3|   micheal|   jhonson|         3|   inactive|
|      4|      abhi|   wiliams|         1|     active|
|      5|       ram|     brown|         4|   inactive|
+-------+----------+----------+----------+-----------+
only showing top 5 rows



In [21]:
 #Transformation 1: Concatenate First and Last Names
load_data = load_data.withColumn('full_name', concat(col('cust_fname'), lit(' '), col('cust_lname')))
load_data.show(10)

+-------+----------+----------+----------+-----------+---------------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|
+-------+----------+----------+----------+-----------+---------------+
|      1|      john|       doe|         5|     active|       john doe|
|      2|      jane|     smith|         8|     active|     jane smith|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|
|      4|      abhi|   wiliams|         1|     active|   abhi wiliams|
|      5|       ram|     brown|         4|   inactive|      ram brown|
|      6|     emily|  anderson|         2|     active| emily anderson|
|      7|   william|     jones|        10|     active|  william jones|
|      8|     susan|     davis|         7|   inactive|    susan davis|
|      9|     david|    miller|         9|     active|   david miller|
|     10|      sara|     moore|         2|   inactive|     sara moore|
+-------+----------+----------+----------+-----------+---------------+
only s

In [22]:
# Transformation 2: Calculate Net Salary (subtract 10% as taxes)
load_data = load_data.withColumn('net_salary', floor(lit(10000) + rand() * lit(50)))
load_data.show(10)

+-------+----------+----------+----------+-----------+---------------+----------+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|net_salary|
+-------+----------+----------+----------+-----------+---------------+----------+
|      1|      john|       doe|         5|     active|       john doe|     10038|
|      2|      jane|     smith|         8|     active|     jane smith|     10034|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|     10020|
|      4|      abhi|   wiliams|         1|     active|   abhi wiliams|     10020|
|      5|       ram|     brown|         4|   inactive|      ram brown|     10021|
|      6|     emily|  anderson|         2|     active| emily anderson|     10003|
|      7|   william|     jones|        10|     active|  william jones|     10009|
|      8|     susan|     davis|         7|   inactive|    susan davis|     10015|
|      9|     david|    miller|         9|     active|   david miller|     10010|
|     10|      s

In [23]:
#adding age column
load_data = load_data.withColumn('age', floor(lit(20) + rand() * lit(31)))
load_data.show(10)

+-------+----------+----------+----------+-----------+---------------+----------+---+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|net_salary|age|
+-------+----------+----------+----------+-----------+---------------+----------+---+
|      1|      john|       doe|         5|     active|       john doe|     10038| 23|
|      2|      jane|     smith|         8|     active|     jane smith|     10034| 40|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|     10020| 45|
|      4|      abhi|   wiliams|         1|     active|   abhi wiliams|     10020| 21|
|      5|       ram|     brown|         4|   inactive|      ram brown|     10021| 29|
|      6|     emily|  anderson|         2|     active| emily anderson|     10003| 38|
|      7|   william|     jones|        10|     active|  william jones|     10009| 35|
|      8|     susan|     davis|         7|   inactive|    susan davis|     10015| 42|
|      9|     david|    miller|         9|     active|

In [24]:
# # Transformation 3: Filter by Age (age >= 30)
load_data = load_data.filter(col('age')>= 30)
load_data.show()


+-------+----------+----------+----------+-----------+---------------+----------+---+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|net_salary|age|
+-------+----------+----------+----------+-----------+---------------+----------+---+
|      2|      jane|     smith|         8|     active|     jane smith|     10034| 40|
|      3|   micheal|   jhonson|         3|   inactive|micheal jhonson|     10020| 45|
|      6|     emily|  anderson|         2|     active| emily anderson|     10003| 38|
|      7|   william|     jones|        10|     active|  william jones|     10009| 35|
|      8|     susan|     davis|         7|   inactive|    susan davis|     10015| 42|
|     10|      sara|     moore|         2|   inactive|     sara moore|     10002| 41|
|     11|     james|    tailor|         5|   inactive|   james tailor|     10045| 31|
|     12|    olivia|    wilson|         3|   inactive|  olivia wilson|     10002| 44|
|     13|    robert|     evans|        11|     active|

In [25]:
# Transformation 4: Group by Age and Calculate Average Salary
avg_salary_by_age = load_data.groupBy('age').agg({'net_salary' :'avg'}).withColumnRenamed('avg(salary)', 'avg_salary')
avg_salary_by_age.show()

+---+---------------+
|age|avg(net_salary)|
+---+---------------+
| 43|        10013.0|
| 31|        10045.0|
| 41|        10006.5|
| 48|        10014.5|
| 44|        10002.0|
| 35|        10009.0|
| 38|        10008.5|
| 42|        10015.0|
| 46|        10041.0|
| 40|        10034.0|
| 45|        10020.0|
| 47|        10039.0|
+---+---------------+



In [26]:
load_data = load_data.orderBy("age")
load_data.show()

+-------+----------+----------+----------+-----------+---------------+----------+---+
|cust_id|cust_fname|cust_lname|cust_order|cust_status|      full_name|net_salary|age|
+-------+----------+----------+----------+-----------+---------------+----------+---+
|     11|     james|    tailor|         5|   inactive|   james tailor|     10045| 31|
|      7|   william|     jones|        10|     active|  william jones|     10009| 35|
|      6|     emily|  anderson|         2|     active| emily anderson|     10003| 38|
|     14|      emma|    thomas|        29|     active|    emma thomas|     10014| 38|
|      2|      jane|     smith|         8|     active|     jane smith|     10034| 40|
|     10|      sara|     moore|         2|   inactive|     sara moore|     10002| 41|
|     16|  isabella|     white|         6|   inactive| isabella white|     10011| 41|
|      8|     susan|     davis|         7|   inactive|    susan davis|     10015| 42|
|     17|    joseph|    martin|         4|   inactive|

In [None]:
# Save the transformed data to an external CSV file
load_data.write.csv(target_path, mode='overwrite', header=True)