In [1]:
!pip install pyspark

Collecting pyspark
  Using cached pyspark-3.4.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.7
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.0


In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col,struct,when,explode
from pyspark.sql import Row
from pyspark.sql.types import StringType,IntegerType,FloatType,StructType,StructField

In [3]:
spark = SparkSession.builder.appName("pysparktest").getOrCreate()

23/06/18 20:46:24 WARN Utils: Your hostname, Kunals-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.68.50 instead (on interface en0)
23/06/18 20:46:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/18 20:46:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
rdd1 = spark._sc.parallelize([1,2,3])
rdd1.collect()

                                                                                

[1, 2, 3]

In [6]:
data = [(1,"AZ-201",23.2),(2,"DP-104",112.2),(3,"DP-203",99.89)]
rdd1 = spark._sc.parallelize(data)

In [7]:
rdd1.collect()

[(1, 'AZ-201', 23.2), (2, 'DP-104', 112.2), (3, 'DP-203', 99.89)]

In [8]:
df1 = rdd1.toDF()

                                                                                

In [9]:
df1.show()

+---+------+-----+
| _1|    _2|   _3|
+---+------+-----+
|  1|AZ-201| 23.2|
|  2|DP-104|112.2|
|  3|DP-203|99.89|
+---+------+-----+



In [10]:
schema = StructType([StructField("ID",IntegerType(),True),StructField("Course",StringType(),True),StructField("Price",FloatType(),True)])

In [11]:
df1 = spark.createDataFrame(data = data,schema= schema)
df1.show()

+---+------+-----+
| ID|Course|Price|
+---+------+-----+
|  1|AZ-201| 23.2|
|  2|DP-104|112.2|
|  3|DP-203|99.89|
+---+------+-----+



In [12]:
df1.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Course: string (nullable = true)
 |-- Price: float (nullable = true)



In [13]:
df1.columns

['ID', 'Course', 'Price']

In [14]:
type(df1)

pyspark.sql.dataframe.DataFrame

In [15]:
df1.sort(F.col("Price").desc()).show()



+---+------+-----+
| ID|Course|Price|
+---+------+-----+
|  2|DP-104|112.2|
|  3|DP-203|99.89|
|  1|AZ-201| 23.2|
+---+------+-----+



                                                                                

In [16]:
df1.where("Course == 'DP-104'").show()
df1.where(F.col("Course")=="DP-104").show()

+---+------+-----+
| ID|Course|Price|
+---+------+-----+
|  2|DP-104|112.2|
+---+------+-----+

+---+------+-----+
| ID|Course|Price|
+---+------+-----+
|  2|DP-104|112.2|
+---+------+-----+



In [51]:
df1.agg(F.avg(F.col("Price"))).show()


[Stage 35:>                                                         (0 + 8) / 8]

+-----------------+
|       avg(Price)|
+-----------------+
|78.42999903361003|
+-----------------+




                                                                                

In [130]:
import os
os.listdir()

['.DS_Store',
 'Pyspark_new.ipynb',
 'restaurant_orders.csv',
 'spark-warehouse',
 '.ipynb_checkpoints',
 'Pyspark_Python.ipynb',
 'data']

In [132]:
os.listdir('data')

['Log.parquet', 'customer_obj.json', 'customer_arr.json']

In [145]:
jsondf = spark.read.options(header = "true",inferSchema = "true").format("json").load("data/customer_arr.json")
jsondf.show(truncate= False)

+------------------------+----------+------------+----------+
|courses                 |customerid|customername|registered|
+------------------------+----------+------------+----------+
|[AZ-900, AZ-500, AZ-303]|1         |UserA       |true      |
|[AZ-104, AZ-500, DP-200]|2         |UserB       |true      |
+------------------------+----------+------------+----------+



In [134]:
logdf = spark.read.options(header="true",inferSchema = "true").format("parquet").load("data/Log.parquet")
logdf.show(10)


[Stage 123:>                                                        (0 + 1) / 1]

+---+--------------------+--------------------+---------+--------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| Id|       Correlationid|       Operationname|   Status| Eventcategory|        Level|                Time|        Subscription|    Eventinitiatedby|        Resourcetype|       Resourcegroup|
+---+--------------------+--------------------+---------+--------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  1|66641e13-d19f-4ce...| Delete SQL database|Succeeded|Administrative|Informational|2021-06-15 10:14:...|20c6eec9-2d80-470...|Microsoft Azure S...|Microsoft.Sql/ser...|synapseworkspace-...|
|  2|66641e13-d19f-4ce...| Delete SQL database|  Started|Administrative|Informational|2021-06-15 10:14:...|20c6eec9-2d80-470...|Microsoft Azure S...|Microsoft.Sql/ser...|synapseworkspace-...|
|  3|66641e13-d19f-4ce...| Delete SQL da


                                                                                

In [143]:
logdf.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Correlationid: string (nullable = true)
 |-- Operationname: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Eventcategory: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Subscription: string (nullable = true)
 |-- Eventinitiatedby: string (nullable = true)
 |-- Resourcetype: string (nullable = true)
 |-- Resourcegroup: string (nullable = true)



In [129]:
df2 = spark.read.option("header","true").format("csv").load("restaurant_orders.csv")
df2.show(15)

+------------+----------------+--------------------+--------+-------------+--------------+
|Order Number|      Order Date|           Item Name|Quantity|Product Price|Total products|
+------------+----------------+--------------------+--------+-------------+--------------+
|       16118|03/08/2019 20:25|       Plain Papadum|       2|          0.8|             6|
|       16118|03/08/2019 20:25|    King Prawn Balti|       1|        12.95|             6|
|       16118|03/08/2019 20:25|         Garlic Naan|       1|         2.95|             6|
|       16118|03/08/2019 20:25|       Mushroom Rice|       1|         3.95|             6|
|       16118|03/08/2019 20:25| Paneer Tikka Masala|       1|         8.95|             6|
|       16118|03/08/2019 20:25|       Mango Chutney|       1|          0.5|             6|
|       16117|03/08/2019 20:17|          Plain Naan|       1|          2.6|             7|
|       16117|03/08/2019 20:17|       Mushroom Rice|       1|         3.95|             7|

In [68]:
df3 = spark.read.options( header = "true",inferSchema="true").csv("restaurant_orders.csv")
df3.show(10)

+------------+----------------+--------------------+--------+-------------+--------------+
|Order Number|      Order Date|           Item Name|Quantity|Product Price|Total products|
+------------+----------------+--------------------+--------+-------------+--------------+
|       16118|03/08/2019 20:25|       Plain Papadum|       2|          0.8|             6|
|       16118|03/08/2019 20:25|    King Prawn Balti|       1|        12.95|             6|
|       16118|03/08/2019 20:25|         Garlic Naan|       1|         2.95|             6|
|       16118|03/08/2019 20:25|       Mushroom Rice|       1|         3.95|             6|
|       16118|03/08/2019 20:25| Paneer Tikka Masala|       1|         8.95|             6|
|       16118|03/08/2019 20:25|       Mango Chutney|       1|          0.5|             6|
|       16117|03/08/2019 20:17|          Plain Naan|       1|          2.6|             7|
|       16117|03/08/2019 20:17|       Mushroom Rice|       1|         3.95|             7|

In [70]:
df3.printSchema()

root
 |-- Order Number: integer (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Item Name: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Product Price: double (nullable = true)
 |-- Total products: integer (nullable = true)



#### Without using inferschema

In [63]:
df2.printSchema()

root
 |-- Order Number: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Item Name: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Product Price: string (nullable = true)
 |-- Total products: string (nullable = true)



In [59]:
df2.count()

74818

In [61]:
df2.summary().show()

23/04/16 21:29:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+----------------+--------------+------------------+-----------------+------------------+
|summary|      Order Number|      Order Date|     Item Name|          Quantity|    Product Price|    Total products|
+-------+------------------+----------------+--------------+------------------+-----------------+------------------+
|  count|             74818|           74818|         74818|             74818|            74818|             74818|
|   mean|  9115.63816193964|            null|          null|  1.24356438290251|5.286491886982787|  6.93143361223235|
| stddev|4052.2104520331745|            null|          null|0.7982073410496792|  3.3382213559897|3.9548324912473527|
|    min|             10000|01/01/2017 17:31|    Aloo Chaat|                 1|              0.5|                 1|
|    25%|            5590.0|            null|          null|               1.0|             2.95|               5.0|
|    50%|            9102.0|            null|          null|    

In [62]:
df2.columns

['Order Number',
 'Order Date',
 'Item Name',
 'Quantity',
 'Product Price',
 'Total products']

In [72]:
df3.schema.json()

'{"fields":[{"metadata":{},"name":"Order Number","nullable":true,"type":"integer"},{"metadata":{},"name":"Order Date","nullable":true,"type":"string"},{"metadata":{},"name":"Item Name","nullable":true,"type":"string"},{"metadata":{},"name":"Quantity","nullable":true,"type":"integer"},{"metadata":{},"name":"Product Price","nullable":true,"type":"double"},{"metadata":{},"name":"Total products","nullable":true,"type":"integer"}],"type":"struct"}'

In [85]:
newdf2 = df2.withColumn("NewInfo",struct(col("Order Number").alias("Order_num"),when(col("Product Price")>5.0,"Cheap").otherwise("Expensive").alias("Cost"))).drop('Order Number',
 'Order Date',
 'Item Name',
 'Quantity')
newdf2.printSchema()

root
 |-- Product Price: string (nullable = true)
 |-- Total products: string (nullable = true)
 |-- NewInfo: struct (nullable = false)
 |    |-- Order_num: string (nullable = true)
 |    |-- Cost: string (nullable = false)



In [141]:
logdf.filter(col("Resourcegroup").isNull()).select("Id","Operationname","Status","Time","Resourcegroup","ResourceType").show(5)


+---+--------------------+---------+--------------------+-------------+------------+
| Id|       Operationname|   Status|                Time|Resourcegroup|ResourceType|
+---+--------------------+---------+--------------------+-------------+------------+
|195|Check Server Name...|Succeeded|2021-06-14 18:14:...|         null|        null|
|196|Check Server Name...|  Started|2021-06-14 18:14:...|         null|        null|
|197|Check Server Name...|Succeeded|2021-06-14 18:13:...|         null|        null|
|198|Check Server Name...|  Started|2021-06-14 18:13:...|         null|        null|
|199|Check Server Name...|Succeeded|2021-06-14 18:13:...|         null|        null|
+---+--------------------+---------+--------------------+-------------+------------+
only showing top 5 rows



In [142]:
logdf.filter(col("Resourcegroup").isNotNull()).select("Id","Operationname","Status","Time","Resourcegroup","ResourceType").show(5)


+---+-------------------+---------+--------------------+--------------------+--------------------+
| Id|      Operationname|   Status|                Time|       Resourcegroup|        ResourceType|
+---+-------------------+---------+--------------------+--------------------+--------------------+
|  1|Delete SQL database|Succeeded|2021-06-15 10:14:...|synapseworkspace-...|Microsoft.Sql/ser...|
|  2|Delete SQL database|  Started|2021-06-15 10:14:...|synapseworkspace-...|Microsoft.Sql/ser...|
|  3|Delete SQL database| Accepted|2021-06-15 10:14:...|synapseworkspace-...|Microsoft.Sql/ser...|
|  4|    Delete SqlPools|Succeeded|2021-06-15 10:14:...|             new-grp|Microsoft.Synapse...|
|  5|    Delete SqlPools|  Started|2021-06-15 10:14:...|             new-grp|Microsoft.Synapse...|
+---+-------------------+---------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [109]:
df3.groupBy(col("Item Name")).count().select(col("Item Name"),col("count").alias("Total_count")).sort(col("Total_count").desc()).show()


+--------------------+-----------+
|           Item Name|Total_count|
+--------------------+-----------+
|          Pilau Rice|       4721|
|          Plain Naan|       3753|
|       Plain Papadum|       3598|
|         Garlic Naan|       2628|
|        Onion Bhajee|       2402|
|          Plain Rice|       2369|
|Chicken Tikka Masala|       2133|
|       Mango Chutney|       2070|
|         Bombay Aloo|       1752|
|       Peshwari Naan|       1535|
|          Mint Sauce|       1463|
|       Mushroom Rice|       1452|
|          Keema Naan|       1362|
|               Korma|       1201|
|           Saag Aloo|       1194|
|         Meat Samosa|       1192|
|             Chapati|       1170|
|       Onion Chutney|       1033|
|      Butter Chicken|        980|
|     Korma - Chicken|        943|
+--------------------+-----------+
only showing top 20 rows



In [119]:
import datetime as dt
curr = dt.datetime.now()

In [128]:
df1.withColumn("ss",lit(curr)).select(col("ss"),year(col("ss")),month(col("ss")),dayofyear(col("ss")),to_date(col("ss"),"dd/mm/yyyy")).show(truncate = False)


+-------------------------+--------+---------+-------------+-----------------------+
|ss                       |year(ss)|month(ss)|dayofyear(ss)|to_date(ss, dd/mm/yyyy)|
+-------------------------+--------+---------+-------------+-----------------------+
|2023-04-16 23:32:52.58307|2023    |4        |106          |2023-04-16             |
|2023-04-16 23:32:52.58307|2023    |4        |106          |2023-04-16             |
|2023-04-16 23:32:52.58307|2023    |4        |106          |2023-04-16             |
+-------------------------+--------+---------+-------------+-----------------------+



In [146]:
jsondf.show()

+--------------------+----------+------------+----------+
|             courses|customerid|customername|registered|
+--------------------+----------+------------+----------+
|[AZ-900, AZ-500, ...|         1|       UserA|      true|
|[AZ-104, AZ-500, ...|         2|       UserB|      true|
+--------------------+----------+------------+----------+



In [148]:
jsondf.select(explode(col("courses")).alias("Courses"),col("customerid"),col("customername"),col("registered")).show()

+-------+----------+------------+----------+
|Courses|customerid|customername|registered|
+-------+----------+------------+----------+
| AZ-900|         1|       UserA|      true|
| AZ-500|         1|       UserA|      true|
| AZ-303|         1|       UserA|      true|
| AZ-104|         2|       UserB|      true|
| AZ-500|         2|       UserB|      true|
| DP-200|         2|       UserB|      true|
+-------+----------+------------+----------+



In [150]:
os.listdir('data')

['Log.parquet', 'customer_obj.json', 'customer_arr.json']

In [5]:
custdf = spark.read.options(header= "true",inferschema = "true").format("json").load('data/customer_obj.json')
custdf.show(truncate=False)

                                                                                

+------------------------+----------+------------+-----------------+----------+
|courses                 |customerid|customername|details          |registered|
+------------------------+----------+------------+-----------------+----------+
|[AZ-900, AZ-500, AZ-303]|1         |UserA       |{CityA, 111-1112}|true      |
|[AZ-104, AZ-500, DP-200]|2         |UserB       |{CityB, 333-1112}|true      |
+------------------------+----------+------------+-----------------+----------+



In [6]:
custdf.printSchema()

root
 |-- courses: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customerid: long (nullable = true)
 |-- customername: string (nullable = true)
 |-- details: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- mobile: string (nullable = true)
 |-- registered: boolean (nullable = true)



In [14]:
custdf.select(col("details.*")).show()

+-----+--------+
| city|  mobile|
+-----+--------+
|CityA|111-1112|
|CityB|333-1112|
+-----+--------+



In [9]:
custdf.select(col('customername'),col('details.city'),col('details').mobile,explode(col("courses"))).show()

+------------+-----+--------------+------+
|customername| city|details.mobile|   col|
+------------+-----+--------------+------+
|       UserA|CityA|      111-1112|AZ-900|
|       UserA|CityA|      111-1112|AZ-500|
|       UserA|CityA|      111-1112|AZ-303|
|       UserB|CityB|      333-1112|AZ-104|
|       UserB|CityB|      333-1112|AZ-500|
|       UserB|CityB|      333-1112|DP-200|
+------------+-----+--------------+------+



In [12]:
schema2 = StructType().add("col1",StringType()).add("col2",IntegerType())

In [13]:
schema2.json()

'{"fields":[{"metadata":{},"name":"col1","nullable":true,"type":"string"},{"metadata":{},"name":"col2","nullable":true,"type":"integer"}],"type":"struct"}'