In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark

In [2]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col, max, min, sum, mean, explode


In [3]:
spark = SparkSession.builder.appName("ScalaToPySpark").getOrCreate()
sc = spark.sparkContext

In [4]:
data = sc.parallelize(range(1,31))

In [5]:
mapdata = data.map(lambda x:(x,x*x))

In [6]:
df = mapdata.toDF(["number","Square"])

In [7]:
df.show()

+------+------+
|number|Square|
+------+------+
|     1|     1|
|     2|     4|
|     3|     9|
|     4|    16|
|     5|    25|
|     6|    36|
|     7|    49|
|     8|    64|
|     9|    81|
|    10|   100|
|    11|   121|
|    12|   144|
|    13|   169|
|    14|   196|
|    15|   225|
|    16|   256|
|    17|   289|
|    18|   324|
|    19|   361|
|    20|   400|
+------+------+
only showing top 20 rows



In [8]:
#Write the data into hive 
#df.write.saveAsTable("hivetable")

In [9]:
#To write the data into specified database
#df.coalesce(1).write.mode("overwrite").saveAsTable(classdb.hivetable) # By default it will save the file in a paraquette format

In [10]:
#Hive commands
#show databases()

In [11]:
#spark.sql("drop database sparkdb CASCADE")

In [12]:
#spark.sql("create database sparkdb")

In [13]:
#spark.sql("create table if not exists sparkdb.empdetails(empId int, name string, loc string, pannum string, yrsofexp float,skillset string)")

In [14]:
#spark.sql("Load data local inpath file:///C:/Users/aksha/Pyspark/empdetails.csv" overwrite into table sparkdb)

In [15]:
df1 = spark.read.format("csv").option("header","false").option("delimiter","\t").load("file:///C:/Users/aksha/Pyspark/tabData.log").selectExpr("_c0 as ID","_c1 as NAME","_c2 as SAL")

In [16]:
df1.count()

13

In [17]:
df1.show()

+-----+-------+-------+
|   ID|   NAME|    SAL|
+-----+-------+-------+
|EMPID|  ENAME|ESALARY|
| 1111|  VVRaj|  12000|
| 1331|Krishna|  14000|
| 1551|   Mike|  16000|
| 1002|  Ramya|  24000|
| 1900| Rakesh|  34000|
| 1455| Dr.Ali|  90000|
| 8005|Mounika|  24000|
| 1006| Sourav|  26000|
| 1007|   Siya|  98000|
| 1008| Bhavya|  30000|
| 1009|Trinath|  34000|
| 2000|  Komal|  82000|
+-----+-------+-------+



In [18]:
df2 = spark.read.format("csv").option("header","true").option("delimiter","\t").load("file:///C:/Users/aksha/Pyspark/tabData.log") #.selectExpr("_c0 as ID","_c1 as NAME","_c2 as SAL")

In [19]:
df2.count()

12

In [20]:
df2.show()

+-----+-------+-------+
|EMPID|  ENAME|ESALARY|
+-----+-------+-------+
| 1111|  VVRaj|  12000|
| 1331|Krishna|  14000|
| 1551|   Mike|  16000|
| 1002|  Ramya|  24000|
| 1900| Rakesh|  34000|
| 1455| Dr.Ali|  90000|
| 8005|Mounika|  24000|
| 1006| Sourav|  26000|
| 1007|   Siya|  98000|
| 1008| Bhavya|  30000|
| 1009|Trinath|  34000|
| 2000|  Komal|  82000|
+-----+-------+-------+



In [21]:
jsonDF = spark.read.json("file:///C:/Users/aksha/Pyspark/edetails.json")

In [22]:
jsonDF.show()

+--------------------+----------+--------------+----------------+-----+
|             address|      city|          dept|            name|  sal|
+--------------------+----------+--------------+----------------+-----+
|[{"hno":"1-41-165"}]| newjersey|          NULL|          Martin|80000|
|                NULL|   newyork|          NULL|            Mike|75000|
|                NULL|      peru|     LOGISTICS|          Andrew|87000|
|                NULL|      NULL|           HRD|           Teena|57000|
|                 USA| elizabeth|          NULL|           clark|92000|
|                NULL|    boston|          NULL|          Aneesh|65000|
|                NULL|   atlanta|     TRANSPORT|           chloe|78000|
|                NULL|      NULL|     MARKETING|          Aakash|77000|
|                 USA|jerseycity|          NULL|          daniel|82000|
|                NULL|    Vienna|          NULL|         Richard|63000|
|                NULL|  Dornbirn|   IMMIGRATION|          Faisal

In [23]:
jsonDF.na.fill("TEST_DEPT","dept").show()

+--------------------+----------+--------------+----------------+-----+
|             address|      city|          dept|            name|  sal|
+--------------------+----------+--------------+----------------+-----+
|[{"hno":"1-41-165"}]| newjersey|     TEST_DEPT|          Martin|80000|
|                NULL|   newyork|     TEST_DEPT|            Mike|75000|
|                NULL|      peru|     LOGISTICS|          Andrew|87000|
|                NULL|      NULL|           HRD|           Teena|57000|
|                 USA| elizabeth|     TEST_DEPT|           clark|92000|
|                NULL|    boston|     TEST_DEPT|          Aneesh|65000|
|                NULL|   atlanta|     TRANSPORT|           chloe|78000|
|                NULL|      NULL|     MARKETING|          Aakash|77000|
|                 USA|jerseycity|     TEST_DEPT|          daniel|82000|
|                NULL|    Vienna|     TEST_DEPT|         Richard|63000|
|                NULL|  Dornbirn|   IMMIGRATION|          Faisal

In [24]:
# this is to fill 2 different columns with null values
#jsonDF.na.fill({"dept":"TEST_DEPT","city":"USA"}).show()

In [25]:
data1 = sc.textFile("file:///C:/Users/aksha/Pyspark/hivejoinData.log")

In [26]:
cdata = data1.map(lambda x: x.split(","))

In [27]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("loc", StringType(), True),
    StructField("pan", StringType(), True),
    StructField("yrsOfExp", DoubleType(), True),
    StructField("skillset", StringType(), True),
])

In [28]:
mappingFile = cdata.map(lambda x: (int(x[0]),
                                     x[1],
                                     x[2],
                                     x[3],
                                     float(x[4].strip()) if x[4].strip().replace('.','').isdigit() else 0.0,
                                     x[5]))

In [29]:
df = spark.createDataFrame(mappingFile, schema=schema)

In [30]:
df.show()

+---+-------+---------+----------+--------+--------+
| id|   name|      loc|       pan|yrsOfExp|skillset|
+---+-------+---------+----------+--------+--------+
|100|  GOPAL|HYDERABAD|AAZPT1234J|    14.0| BIGDATA|
|101|   ROJA|BANGALORE|AZJJI2324N|    13.0|     SAP|
|102| RAKESH|  CHENNAI|JIRTY2034N|     9.0|    JAVA|
|111|  MANJU|     PUNE|ERYUR6767K|    11.0| DOT NET|
|222|  SANJU|   MUMBAI|TRUTY7876B|    10.0|      BI|
|333|SHANKAR|HYDERABAD| RTUTL590H|     6.0|     C++|
+---+-------+---------+----------+--------+--------+



In [31]:
df2 = spark.read.parquet("file:///C:/Users/aksha/Pyspark/emp/part-00000-9c3ad50f-fba8-4380-824a-9a33a3d95079-c000.snappy.parquet")

In [32]:
df2.show()

+-----+-----+-----+
|empid|ename| esal|
+-----+-----+-----+
|   80|EMP49|49000|
|   95|GOPAL|12000|
|   99| EMP2|25000|
|  100| EMP1|12000|
|  101| EMP2|14000|
|  102| EMP3|16000|
|  103| EMP4|18000|
|  104| EMP5|20000|
|  105| EMP6|22000|
|  106| EMP7|24000|
|  107| EMP8|26000|
|  108| EMP9|28000|
|  109|EMP10|30000|
|  110|EMP34|45000|
|  114|EMP35|49000|
|  115|EMP36|52000|
|  116|EMP38|55000|
|  117|EMP39|59000|
|  118|EMP40|62000|
|  119|EMP38|55000|
+-----+-----+-----+



In [33]:
df.createOrReplaceTempView("textTab")

In [34]:
df.createOrReplaceTempView("parquettab")

In [35]:
#spark.sql("select    from textTab t")

In [36]:
df.join(df2,df["id"] == df2["empid"]).show()

+---+------+---------+----------+--------+--------+-----+-----+-----+
| id|  name|      loc|       pan|yrsOfExp|skillset|empid|ename| esal|
+---+------+---------+----------+--------+--------+-----+-----+-----+
|100| GOPAL|HYDERABAD|AAZPT1234J|    14.0| BIGDATA|  100| EMP1|12000|
|101|  ROJA|BANGALORE|AZJJI2324N|    13.0|     SAP|  101| EMP2|14000|
|102|RAKESH|  CHENNAI|JIRTY2034N|     9.0|    JAVA|  102| EMP3|16000|
+---+------+---------+----------+--------+--------+-----+-----+-----+



In [37]:
print("df columns:", df.columns)
print("df2 columns:", df2.columns)

df columns: ['id', 'name', 'loc', 'pan', 'yrsOfExp', 'skillset']
df2 columns: ['empid', 'ename', 'esal']


In [38]:
df.join(df2, df["id"] == df2["empid"],"left_outer").show()

+---+-------+---------+----------+--------+--------+-----+-----+-----+
| id|   name|      loc|       pan|yrsOfExp|skillset|empid|ename| esal|
+---+-------+---------+----------+--------+--------+-----+-----+-----+
|100|  GOPAL|HYDERABAD|AAZPT1234J|    14.0| BIGDATA|  100| EMP1|12000|
|101|   ROJA|BANGALORE|AZJJI2324N|    13.0|     SAP|  101| EMP2|14000|
|102| RAKESH|  CHENNAI|JIRTY2034N|     9.0|    JAVA|  102| EMP3|16000|
|111|  MANJU|     PUNE|ERYUR6767K|    11.0| DOT NET| NULL| NULL| NULL|
|222|  SANJU|   MUMBAI|TRUTY7876B|    10.0|      BI| NULL| NULL| NULL|
|333|SHANKAR|HYDERABAD| RTUTL590H|     6.0|     C++| NULL| NULL| NULL|
+---+-------+---------+----------+--------+--------+-----+-----+-----+



In [39]:
df.join(df2, df["id"] == df2["empid"],"right_outer").show()

+----+------+---------+----------+--------+--------+-----+-----+-----+
|  id|  name|      loc|       pan|yrsOfExp|skillset|empid|ename| esal|
+----+------+---------+----------+--------+--------+-----+-----+-----+
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  108| EMP9|28000|
| 101|  ROJA|BANGALORE|AZJJI2324N|    13.0|     SAP|  101| EMP2|14000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  115|EMP36|52000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  103| EMP4|18000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  117|EMP39|59000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  107| EMP8|26000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  114|EMP35|49000|
| 100| GOPAL|HYDERABAD|AAZPT1234J|    14.0| BIGDATA|  100| EMP1|12000|
| 102|RAKESH|  CHENNAI|JIRTY2034N|     9.0|    JAVA|  102| EMP3|16000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|   80|EMP49|49000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|   95|GOPAL|12000|
|NULL|

In [40]:
df.join(df2, df["id"] == df2["empid"],"full_outer").show()

+----+------+---------+----------+--------+--------+-----+-----+-----+
|  id|  name|      loc|       pan|yrsOfExp|skillset|empid|ename| esal|
+----+------+---------+----------+--------+--------+-----+-----+-----+
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|   80|EMP49|49000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|   95|GOPAL|12000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|   99| EMP2|25000|
| 100| GOPAL|HYDERABAD|AAZPT1234J|    14.0| BIGDATA|  100| EMP1|12000|
| 101|  ROJA|BANGALORE|AZJJI2324N|    13.0|     SAP|  101| EMP2|14000|
| 102|RAKESH|  CHENNAI|JIRTY2034N|     9.0|    JAVA|  102| EMP3|16000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  103| EMP4|18000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  104| EMP5|20000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  105| EMP6|22000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  106| EMP7|24000|
|NULL|  NULL|     NULL|      NULL|    NULL|    NULL|  107| EMP8|26000|
|NULL|

In [41]:
datadf = spark.read.json("file:///C:/Users/aksha/Pyspark/ComplexJsonData.json")

In [42]:
# withColumn returns a new DataFrame with an added column, typically after performing a column operation.

In [43]:
datadf.printSchema()

root
 |-- instid: long (nullable = true)
 |-- instname: string (nullable = true)
 |-- inststatus: string (nullable = true)
 |-- pathdet: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- docid: long (nullable = true)
 |    |    |-- enddate: string (nullable = true)
 |    |    |-- plnstid: long (nullable = true)
 |    |    |-- startdate: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |    |-- workitemid: long (nullable = true)



In [44]:
datadf.show()

+------+---------------+----------+--------------------+
|instid|       instname|inststatus|             pathdet|
+------+---------------+----------+--------------------+
| 12345|789654123125485|  Released|[{52312, 2017-12-...|
| 12347|789654123135452|   Pending|[{52315, 2017-12-...|
+------+---------------+----------+--------------------+



In [45]:
dfdata1 = datadf.withColumn("pathdet1",explode(col("pathdet")))

In [46]:
dfdata1.printSchema()

root
 |-- instid: long (nullable = true)
 |-- instname: string (nullable = true)
 |-- inststatus: string (nullable = true)
 |-- pathdet: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- docid: long (nullable = true)
 |    |    |-- enddate: string (nullable = true)
 |    |    |-- plnstid: long (nullable = true)
 |    |    |-- startdate: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |    |-- workitemid: long (nullable = true)
 |-- pathdet1: struct (nullable = true)
 |    |-- docid: long (nullable = true)
 |    |-- enddate: string (nullable = true)
 |    |-- plnstid: long (nullable = true)
 |    |-- startdate: string (nullable = true)
 |    |-- username: string (nullable = true)
 |    |-- workitemid: long (nullable = true)



In [47]:
dfdata1.show()

+------+---------------+----------+--------------------+--------------------+
|instid|       instname|inststatus|             pathdet|            pathdet1|
+------+---------------+----------+--------------------+--------------------+
| 12345|789654123125485|  Released|[{52312, 2017-12-...|{52312, 2017-12-0...|
| 12345|789654123125485|  Released|[{52312, 2017-12-...|{52312, 2017-12-0...|
| 12345|789654123125485|  Released|[{52312, 2017-12-...|{52312, 2017-12-0...|
| 12347|789654123135452|   Pending|[{52315, 2017-12-...|{52315, 2017-12-0...|
| 12347|789654123135452|   Pending|[{52315, 2017-12-...|{52315, , 12347, ...|
+------+---------------+----------+--------------------+--------------------+



In [48]:
dfdata2 = dfdata1.select("instid","instname", "inststatus", col("pathdet1.docid").alias("docid"),col("pathdet1.workitemid").alias("workitemid"),
    col("pathdet1.username").alias("username"), col("pathdet1.startdate").alias("startdate"),col("pathdet1.enddate").alias("enddate"))

In [49]:
dfdata2.printSchema()

root
 |-- instid: long (nullable = true)
 |-- instname: string (nullable = true)
 |-- inststatus: string (nullable = true)
 |-- docid: long (nullable = true)
 |-- workitemid: long (nullable = true)
 |-- username: string (nullable = true)
 |-- startdate: string (nullable = true)
 |-- enddate: string (nullable = true)



In [50]:
dfdata2.show()

+------+---------------+----------+-----+----------+--------+-------------------+-------------------+
|instid|       instname|inststatus|docid|workitemid|username|          startdate|            enddate|
+------+---------------+----------+-----+----------+--------+-------------------+-------------------+
| 12345|789654123125485|  Released|52312|       235|     dex|2017-12-01 15:05:23|2017-12-01 15:05:56|
| 12345|789654123125485|  Released|52312|       236|     dex|2017-12-01 15:05:56|2017-12-01 15:06:23|
| 12345|789654123125485|  Released|52312|       238|     mhx|2017-12-01 15:06:23|2017-12-01 15:06:45|
| 12347|789654123135452|   Pending|52315|       242|     dex|2017-12-01 15:07:45|2017-12-01 15:08:31|
| 12347|789654123135452|   Pending|52315|       244|     dex|2017-12-01 15:08:31|                   |
+------+---------------+----------+-----+----------+--------+-------------------+-------------------+



In [51]:
person_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("address", StringType(), True)
])

In [52]:
data = [
    ("Raja", 21, "HYD"),
    ("Ramya", 34, "BAN"),
    ("Rani", 30, "MUM")
]

In [53]:
df = spark.createDataFrame(data, person_schema)

In [54]:
df.show()

+-----+---+-------+
| name|age|address|
+-----+---+-------+
| Raja| 21|    HYD|
|Ramya| 34|    BAN|
| Rani| 30|    MUM|
+-----+---+-------+



In [55]:
df.filter("age > 25").show()

+-----+---+-------+
| name|age|address|
+-----+---+-------+
|Ramya| 34|    BAN|
| Rani| 30|    MUM|
+-----+---+-------+



## Converting dataframe to dataset 
Python does not support Datasets but only supports datasets

In [56]:
df = spark.read.json("file:///C:/Users/aksha/Pyspark/InputData.json")

In [57]:
df.columns

['Address', 'Age', 'Desg', 'State', 'YrsOfExp', 'name']

In [58]:
ds = df

In [59]:
ds.columns

['Address', 'Age', 'Desg', 'State', 'YrsOfExp', 'name']

In [60]:
ds.show()

+-----------+----+--------------+-------------+--------+------------+
|    Address| Age|          Desg|        State|YrsOfExp|        name|
+-----------+----+--------------+-------------+--------+------------+
|  Hyderabad|NULL|           STA|    Telangana|    12.5|       Gopal|
|  Bangalore|   6|          NULL|    Karnataka|    NULL|     Mounika|
|    Chennai|  22|   Sw Engineer|    TamilNadu|     1.2|       Ramya|
|  Hyderabad|NULL|            TA|    Telangana|    12.5|      Sekhar|
|  Bangalore|  12|          NULL|    Karnataka|    NULL|      Reshma|
|       Pune|  22|   Sw Engineer|   Maharastra|     1.2|       Ramya|
|  Hyderabad|NULL|Senior Analyst|    Telangana|     9.5| Ravichandra|
|  Bangalore|  18|          NULL|    Karnataka|    NULL|     Meghana|
|      Delhi|  22|   Sw Engineer|        Delhi|     1.2|      Ravali|
|  Hyderabad|NULL|           STA|    Telangana|    12.5| MahaLakshmi|
|  Bangalore|   6|          NULL|    Karnataka|    NULL|SouravMishra|
|     Indore|  22|  