In [0]:
###########################  Module Import  #####################################
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
spark = SparkSession.builder.appName('Pyspark_Project').getOrCreate()

In [0]:
##############################  BAMBOO File Format################################################
file1_location = '/FileStore/shared_uploads/yoganshu.raikhere2@mindtree.com/BambooPlantation.txt'

schema = StructType([
    StructField("Serial_Number", IntegerType(), True),
    StructField("District", StringType(), True),
    StructField("Area", StringType(), True),
    StructField("Production", IntegerType(), True),
    StructField("Productivity", FloatType(), True),
    StructField("Plantation Type", StringType(), True)])

bamboo_df = spark.read.option("delimiter","|").csv(file1_location, schema=schema, header=False)

bamboo_df = bamboo_df.withColumn("District",regexp_replace(col("District"), "[^a-zA-Z]*", ""))

bamboo_df = bamboo_df.withColumn("Area", regexp_replace(col("Area"), "[^0-9]", ""))

bamboo_df = bamboo_df.withColumn("Plantation Type", lit("Bamboo"))

bamboo_df = bamboo_df.withColumn("Productivity", bamboo_df.Production/bamboo_df.Area)

bamboo_df.na.fill('0', subset=['Area'])

bamboo_df.printSchema()
bamboo_df.show()
bamboo_df.agg({'Production':'max'}).show()
bamboo_df.agg({'Production':'min'}).show()
bamboo_df.agg({'Area':'sum'}).show()

root
 |-- Serial_Number: integer (nullable = true)
 |-- District: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Production: integer (nullable = true)
 |-- Productivity: double (nullable = true)
 |-- Plantation Type: string (nullable = false)

+-------------+--------------+----+----------+------------------+---------------+
|Serial_Number|      District|Area|Production|      Productivity|Plantation Type|
+-------------+--------------+----+----------+------------------+---------------+
|            1|      Ariyalur| 112|      2258|20.160714285714285|         Bamboo|
|            2|    Coimbatore|   9|       181| 20.11111111111111|         Bamboo|
|            3|     Cuddalore| 142|      2863|20.161971830985916|         Bamboo|
|            4|    Dharmapuri|  11|       222|20.181818181818183|         Bamboo|
|            5|      Dindigul|  19|       383|20.157894736842106|         Bamboo|
|            6|         Erode| 121|      2439| 20.15702479338843|         Bamboo|

In [0]:
##############################  TEA File Format################################################
file2_location = '/FileStore/shared_uploads/yoganshu.raikhere2@mindtree.com/TeaPlantation.txt'

Column = StructType([\
StructField("Serial_Number",StringType(),True),\
StructField("District",StringType(),True),\
StructField("Area",StringType(),True),\
StructField("Production",StringType(),True),\
StructField("Productivity",StringType(),True)
])

Tea_df=spark.read.csv(file2_location)



tea = Tea_df.withColumn("Col1",F.regexp_replace("_c0","(.*?\\|){5}","$0\n"))

Tea_DF = tea.withColumn("Col_Explode",F.explode(F.split('Col1','\|\n'))).select('Col_Explode')

TEA_DF=Tea_DF.select('Col_Explode').rdd.map(lambda x : x[0].split("|"))

tea_df = spark.createDataFrame(TEA_DF,Column)

tea_df = tea_df.select(col("Serial_Number").cast('int').alias("Serial_Number"),col("District"),col("Area").cast('int').alias("Area"),col("Production").cast('int').alias("Production"),col("Productivity").cast('int').alias("Productivity"))

tea_df = tea_df.withColumn("Plantation Type", lit("Tea"))

tea_df= tea_df.withColumn("Productivity",tea_df.Production/tea_df.Area)

tea_df.printSchema()
tea_df.show()
tea_df.agg({'Production':'max'}).show()
tea_df.agg({'Production':'min'}).show()
tea_df.agg({'Area':'sum'}).show()


root
 |-- Serial_Number: integer (nullable = true)
 |-- District: string (nullable = true)
 |-- Area: integer (nullable = true)
 |-- Production: integer (nullable = true)
 |-- Productivity: double (nullable = true)
 |-- Plantation Type: string (nullable = false)

+-------------+--------------+-----+----------+------------------+---------------+
|Serial_Number|      District| Area|Production|      Productivity|Plantation Type|
+-------------+--------------+-----+----------+------------------+---------------+
|            1|      Ariyalur| null|      null|              null|            Tea|
|            2|    Coimbatore|    4|         6|               1.5|            Tea|
|            3|     Cuddalore| null|      null|              null|            Tea|
|            4|    Dharmapuri| null|      null|              null|            Tea|
|            5|      Dindigul| null|      null|              null|            Tea|
|            6|         Erode| null|      null|              null|      

In [0]:
##########################  Rubber File Format#######################################
file3_location = '/FileStore/shared_uploads/yoganshu.raikhere2@mindtree.com/Rubber_plantation.txt'

rubber_df = spark.read.option("delimiter","|").csv(file3_location, schema = schema, header=True)

rubber_df = rubber_df.withColumn("Plantation Type", lit("Rubber"))

rubber_df = rubber_df.withColumn("Productivity", rubber_df.Production/rubber_df.Area)

rubber_df.printSchema()
rubber_df.show()
rubber_df.agg({'Production':'max'}).show()
rubber_df.agg({'Production':'min'}).show()
rubber_df.agg({'Area':'sum'}).show()


root
 |-- Serial_Number: integer (nullable = true)
 |-- District: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Production: integer (nullable = true)
 |-- Productivity: double (nullable = true)
 |-- Plantation Type: string (nullable = false)

+-------------+--------------+-----+----------+------------------+---------------+
|Serial_Number|      District| Area|Production|      Productivity|Plantation Type|
+-------------+--------------+-----+----------+------------------+---------------+
|            1|      Ariyalur|   NA|      null|              null|         Rubber|
|            2|    Coimbatore|    4|         6|               1.5|         Rubber|
|            3|     Cuddalore|   NA|      null|              null|         Rubber|
|            4|    Dharmapuri|   NA|      null|              null|         Rubber|
|            5|      Dindigul|   NA|      null|              null|         Rubber|
|            6|         Erode|   NA|      null|              null|       

In [0]:
########################  Union  #####################################################
import functools

def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
unioned_df = unionAll([bamboo_df, tea_df, rubber_df])
unioned_df.show()

+-------------+--------------+----+----------+------------------+---------------+
|Serial_Number|      District|Area|Production|      Productivity|Plantation Type|
+-------------+--------------+----+----------+------------------+---------------+
|            1|      Ariyalur| 112|      2258|20.160714285714285|         Bamboo|
|            2|    Coimbatore|   9|       181| 20.11111111111111|         Bamboo|
|            3|     Cuddalore| 142|      2863|20.161971830985916|         Bamboo|
|            4|    Dharmapuri|  11|       222|20.181818181818183|         Bamboo|
|            5|      Dindigul|  19|       383|20.157894736842106|         Bamboo|
|            6|         Erode| 121|      2439| 20.15702479338843|         Bamboo|
|            7|  Kancheepuram|  67|      1351| 20.16417910447761|         Bamboo|
|            8|   Kanyakumari|   2|        40|              20.0|         Bamboo|
|            9|         Karur|   4|        81|             20.25|         Bamboo|
|           10| 