In [0]:
from pyspark.sql import SparkSession
 
# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate() 

In [0]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='sourav', d=date(1999, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='suman', d=date(1996, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='sayantan', d=date(1997, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

Out[3]: DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [0]:
df = spark.createDataFrame([
    (1, 2., 'sourav', date(1999, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'suman', date(1996, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'sayantan', date(1997, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

Out[4]: DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [0]:
pandas_df = pd.DataFrame({
    '1st':[1,2,3],
    '2nd':[4.,5.,6.],
    '3rd':['suvam','subol','suhasish'],
    '4th':[date(1999,2,1),date(1889,3,1), date(2000,5,1)],
    '5th':[datetime(1999,2,1,7,0),datetime(1889,3,1,10,0),datetime(1997, 3, 1,10,0)]
})
df=spark.createDataFrame(pandas_df)
df

Out[5]: DataFrame[1st: bigint, 2nd: double, 3rd: string, 4th: date, 5th: timestamp]

In [0]:
rdd=spark.sparkContext.parallelize([
    (1,2.,'Suvam',date(1997,2,1),datetime(1997,2,1,5,0)),
    (2,3.,'Suryansh',date(1996,3,1),datetime(1998,6,2,5,0)),
    (3,5.,'Subol',date(1995,7,1),datetime(2000,1,3,7,0))
])
df = spark.createDataFrame(rdd, schema=['a','c','d','e'])
df

Out[6]: DataFrame[a: bigint, c: double, d: string, e: date, _5: timestamp]

In [0]:
df.show()
df.printSchema()

+---+---+--------+----------+-------------------+
|  a|  c|       d|         e|                 _5|
+---+---+--------+----------+-------------------+
|  1|2.0|   Suvam|1997-02-01|1997-02-01 05:00:00|
|  2|3.0|Suryansh|1996-03-01|1998-06-02 05:00:00|
|  3|5.0|   Subol|1995-07-01|2000-01-03 07:00:00|
+---+---+--------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- c: double (nullable = true)
 |-- d: string (nullable = true)
 |-- e: date (nullable = true)
 |-- _5: timestamp (nullable = true)



In [0]:
df.show(1)

+---+---+-----+----------+-------------------+
|  a|  c|    d|         e|                 _5|
+---+---+-----+----------+-------------------+
|  1|2.0|Suvam|1997-02-01|1997-02-01 05:00:00|
+---+---+-----+----------+-------------------+
only showing top 1 row



In [0]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 c   | 2.0                 
 d   | Suvam               
 e   | 1997-02-01          
 _5  | 1997-02-01 05:00:00 
only showing top 1 row



In [0]:
df.columns

Out[9]: ['a', 'c', 'd', 'e', '_5']

In [0]:
df = df.withColumnRenamed("_5", "f")
df


Out[10]: DataFrame[a: bigint, c: double, d: string, e: date, f: timestamp]

In [0]:
df.select("a", "c", "f").describe().show()

+-------+---+------------------+
|summary|  a|                 c|
+-------+---+------------------+
|  count|  3|                 3|
|   mean|2.0|3.3333333333333335|
| stddev|1.0|1.5275252316519468|
|    min|  1|               2.0|
|    max|  3|               5.0|
+-------+---+------------------+



In [0]:
simpleData = (("i-101","85123A","ABC",150,6,"2021-12-01 08:16:00","c-1001"), 
 ("i-102","85124A","XYZ",110,6,"2021-12-01 09:12:00","c-1002"), 
 ("i-103","85125A","MNO",100,4,"2021-12-01 10:00:00","c-1003"),
 ("i-104","85126A","VWA",102,5,"2021-12-01 10:31:00","c-1004"), 
 ("i-105","85127A","AAS",100,7,"2021-12-01 10:45:00","c-1005"),
 ("i-106","85128A","FAS",130,3,"2021-12-01 11:06:00","c-1006"), 
 ("i-107","85129A","AFA",175,6,"2021-12-01 11:15:00","c-1007"), 
 ("i-108","85130A","GAG",150,8,"2021-12-01 11:46:00","c-1008"), 
 ("i-109","85131A","AGG",180,8,"2021-12-01 12:56:00","c-1009"), 
 ("i-110","85132A","KKK",200,1,"2021-12-01 14:36:00","c-1010")) 
columns= ["invoice no", "product code", "descr", "unit price", "quantity", "invoice date", "customer id"] 
df = spark.createDataFrame(data = simpleData, schema = columns) 
df.printSchema()
 

  

root
 |-- invoice no: string (nullable = true)
 |-- product code: string (nullable = true)
 |-- descr: string (nullable = true)
 |-- unit price: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- invoice date: string (nullable = true)
 |-- customer id: string (nullable = true)



In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [0]:
from pyspark.sql.functions import sum
df.select(sum("quantity")).collect()

Out[16]: [Row(sum(quantity)=54)]

In [0]:
from pyspark.sql.functions import col
df1 = df.orderBy(col("quantity").desc()).head(3)
df1


Out[20]: [Row(invoice no='i-109', product code='85131A', descr='AGG', unit price=180, quantity=8, invoice date='2021-12-01 12:56:00', customer id='c-1009'),
 Row(invoice no='i-108', product code='85130A', descr='GAG', unit price=150, quantity=8, invoice date='2021-12-01 11:46:00', customer id='c-1008'),
 Row(invoice no='i-105', product code='85127A', descr='AAS', unit price=100, quantity=7, invoice date='2021-12-01 10:45:00', customer id='c-1005')]

In [0]:
df=df.withColumn('sales',col("unit price") * df.quantity)

In [0]:
df.show()

+----------+------------+-----+----------+--------+-------------------+-----------+-----+
|invoice no|product code|descr|unit price|quantity|       invoice date|customer id|sales|
+----------+------------+-----+----------+--------+-------------------+-----------+-----+
|     i-101|      85123A|  ABC|       150|       6|2021-12-01 08:16:00|     c-1001|  900|
|     i-102|      85124A|  XYZ|       110|       6|2021-12-01 09:12:00|     c-1002|  660|
|     i-103|      85125A|  MNO|       100|       4|2021-12-01 10:00:00|     c-1003|  400|
|     i-104|      85126A|  VWA|       102|       5|2021-12-01 10:31:00|     c-1004|  510|
|     i-105|      85127A|  AAS|       100|       7|2021-12-01 10:45:00|     c-1005|  700|
|     i-106|      85128A|  FAS|       130|       3|2021-12-01 11:06:00|     c-1006|  390|
|     i-107|      85129A|  AFA|       175|       6|2021-12-01 11:15:00|     c-1007| 1050|
|     i-108|      85130A|  GAG|       150|       8|2021-12-01 11:46:00|     c-1008| 1200|
|     i-10

In [0]:
df1 = df.orderBy(col("sales").desc()).head(3)
df1

Out[66]: [Row(invoice no='i-109', product code='85131A', descr='AGG', unit price=180, quantity=8, invoice date='2021-12-01 12:56:00', customer id='c-1009', sales=1440, dayOfWeek=1, month=12, year=2021),
 Row(invoice no='i-108', product code='85130A', descr='GAG', unit price=150, quantity=8, invoice date='2021-12-01 11:46:00', customer id='c-1008', sales=1200, dayOfWeek=1, month=12, year=2021),
 Row(invoice no='i-107', product code='85129A', descr='AFA', unit price=175, quantity=6, invoice date='2021-12-01 11:15:00', customer id='c-1007', sales=1050, dayOfWeek=1, month=12, year=2021)]

In [0]:
import datetime
from pyspark.sql.functions import year, month, dayofmonth

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import year, month, dayofmonth
df = df.withColumn('dayOfWeek', dayofmonth(col('invoice date')))
df = df.withColumn('month', month(col('invoice date')))
df = df.withColumn('year', year(col('invoice date')))
df.show()

+----------+------------+-----+----------+--------+-------------------+-----------+-----+---------+-----+----+
|invoice no|product code|descr|unit price|quantity|       invoice date|customer id|sales|dayOfWeek|month|year|
+----------+------------+-----+----------+--------+-------------------+-----------+-----+---------+-----+----+
|     i-101|      85123A|  ABC|       150|       6|2021-12-01 08:16:00|     c-1001|  900|        1|   12|2021|
|     i-102|      85124A|  XYZ|       110|       6|2021-12-01 09:12:00|     c-1002|  660|        1|   12|2021|
|     i-103|      85125A|  MNO|       100|       4|2021-12-01 10:00:00|     c-1003|  400|        1|   12|2021|
|     i-104|      85126A|  VWA|       102|       5|2021-12-01 10:31:00|     c-1004|  510|        1|   12|2021|
|     i-105|      85127A|  AAS|       100|       7|2021-12-01 10:45:00|     c-1005|  700|        1|   12|2021|
|     i-106|      85128A|  FAS|       130|       3|2021-12-01 11:06:00|     c-1006|  390|        1|   12|2021|
|

In [0]:
df.groupby(year("invoice date")).sum("sales").show()

+------------------+----------+
|year(invoice date)|sum(sales)|
+------------------+----------+
|              2021|      7450|
+------------------+----------+



In [0]:
df.groupby(dayofmonth("invoice date")).sum("sales").show()

+------------------------+----------+
|dayofmonth(invoice date)|sum(sales)|
+------------------------+----------+
|                       1|      7450|
+------------------------+----------+



In [0]:
from pyspark.sql.functions import quarter
df.groupBy(quarter("invoice date")).sum("sales").show()

+---------------------+----------+
|quarter(invoice date)|sum(sales)|
+---------------------+----------+
|                    4|      7450|
+---------------------+----------+



In [0]:
df.sort("sales").show(truncate=False)

+----------+------------+-----+----------+--------+-------------------+-----------+-----+---------+-----+----+
|invoice no|product code|descr|unit price|quantity|invoice date       |customer id|sales|dayOfWeek|month|year|
+----------+------------+-----+----------+--------+-------------------+-----------+-----+---------+-----+----+
|i-110     |85132A      |KKK  |200       |1       |2021-12-01 14:36:00|c-1010     |200  |1        |12   |2021|
|i-106     |85128A      |FAS  |130       |3       |2021-12-01 11:06:00|c-1006     |390  |1        |12   |2021|
|i-103     |85125A      |MNO  |100       |4       |2021-12-01 10:00:00|c-1003     |400  |1        |12   |2021|
|i-104     |85126A      |VWA  |102       |5       |2021-12-01 10:31:00|c-1004     |510  |1        |12   |2021|
|i-102     |85124A      |XYZ  |110       |6       |2021-12-01 09:12:00|c-1002     |660  |1        |12   |2021|
|i-105     |85127A      |AAS  |100       |7       |2021-12-01 10:45:00|c-1005     |700  |1        |12   |2021|
|

In [0]:
from pyspark.sql.functions import current_date
 
df2 = spark.createDataFrame([["2022-04-07"]],["currentDate"]) 
df2.show()

+-----------+
|currentDate|
+-----------+
| 2022-04-07|
+-----------+



In [0]:
from datetime import date, timedelta
from pyspark.sql.functions import date_add,last_day,trunc
dt2=date.today()
dt = date.today() + timedelta(5)
dt1 = dt2 -timedelta(5)
print('Current Date :',date.today())
print('5 days after Current Date :',dt)
print('5 days before current date :',dt1)

Current Date : 2022-04-08
5 days after Current Date : 2022-04-13
5 days before current date : 2022-04-03


In [0]:
from pyspark.sql.functions import col


In [0]:
print(dt2)

2022-04-08


In [0]:
df2.show()

+-----------+
|currentDate|
+-----------+
| 2022-04-07|
+-----------+



In [0]:
df2.withColumn("startdateofQuarter",trunc("currentDate","quarter")).withColumn("enddate",last_day(date_add(trunc("currentDate","quarter"),70))).show()

+-----------+------------------+----------+
|currentDate|startdateofQuarter|   enddate|
+-----------+------------------+----------+
| 2022-04-07|        2022-04-01|2022-06-30|
+-----------+------------------+----------+



In [0]:
def endOfMonthDate(col: Column): Column = {
  last_day(col)
}

In [0]:
def beginningOfMonthTime(col: Column): Column = {
  date_trunc("month", col)
}

In [0]:
df2.withColumn("month",month(df2.currentDate)).show()
df2.withColumn("Startday_Of_month",trunc(col("currentDate"),"month")).withColumn("end_of_month",last_day("currentDate")).show()


+-----------+-----+
|currentDate|month|
+-----------+-----+
| 2022-04-07|    4|
+-----------+-----+

+-----------+-----------------+------------+
|currentDate|Startday_Of_month|end_of_month|
+-----------+-----------------+------------+
| 2022-04-07|       2022-04-01|  2022-04-30|
+-----------+-----------------+------------+

