File path : /FileStore/tables/Beach_Weather_Stations___Automated_Sensors.csv

In [0]:
dbutils.fs.ls('/FileStore/tables/')

### Data Reading

In [0]:
data = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/FileStore/tables/Beach_Weather_Stations___Automated_Sensors.csv')

In [0]:
data.display()

### Schema - DDL and StructType
At the time of data reading, we can define our own schema using .schema("ur_custom_schema")

In [0]:
data.printSchema()

## Data Transformations

#### 1. SELECT Command

In [0]:
# Directly with the columns name
# data.select('Station Name', 'Humidity').display()

In [0]:
from pyspark.sql.functions import *

In [0]:
# Using COL method (works with the aggregating functions)
data.select(col('Station Name', 'Humidity')).display()

#### 2. Alias
This methods only works with col object

In [0]:
data.select(col('Station Name').alias('Data Source')).display()

#### 3. FILTER/WHERE

In [0]:
# Case 1 :- fetch all the Foster Weather Station data

data.filter(col('Station Name') == 'Foster Weather Station').display()


In [0]:
# Case 2 :- foster weather station where air temp is greater than 20
data.filter(
    (col('Station Name') != 'Foster Weather Station') & \
    (col('Humidity') > 20) \
    ).display()

In [0]:
# Case 3 - isNull() and isin()
data.filter(\
    col('Station Name').isin('63rd Street Weather Station', 'Foster Weather Station') &\
    col('Heading').isNull()
).display()

#### 4. withColumnRenamed
withColumnRenamed (org_col_name, new_col_name)

#### 5. withColumn (new column or modify existing column)
df.withColumn('new_col_name', lit('new'))

In [0]:
# Replacing the values of staiion name
data.withColumn('Station Name', regexp_replace(col('Station Name'), 'Foster Weather Station', 'Station 2')).display()

#### 4. Type Casting type()

#### 5. Sort or Orderby
.sort()

In [0]:
# Case 1 - sort only single column
data.sort(col('Humidity').desc()).display()

In [0]:
# Case 2 :- Sort on multiple values
data.sort([col('Air Temperature'), col('Humidity')], ascending=[1, 0]).display()

#### 7. Limit

In [0]:
df = data.limit(100)
df.display()

#### 8. Drop - To drop columns
data.drop(col('col_name'))
data.drop('col_1', 'col_2')

#### 9. Drop Duplicates


### Data Function
current_date(), date_add(), date_diff()

In [0]:
df = df.withColumn('Current Date', current_date())

In [0]:
df = df.withColumn('Week After' , date_add('Current Date', 7))
df.display()

In [0]:
df = df.withColumn('Num of days', datediff('Current Date', 'Week After'))
df.display()

#### Changing date format with date_format

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [0]:
df = df.withColumn('Current Date', date_format('Current Date', 'yyyy-MM-dd'))
df.display()

### Handling Null values
Two options - a. Remove rows which contain null values (dropping nulls) <br>
b. Fill the null values with mean or some other value (filling nulls)

In [0]:
df.dropna(subset=['Wet Bulb Temperature']).display()

#### Filling values

In [0]:
df.fillna(0, subset=['Wet Bulb Temperature']).display()

#### Group By with aggregated function


In [0]:
df.groupBy('Station Name').agg(sum('Air Temperature')).display()

In [0]:
df.groupBy('Station Name').agg(max('Air Temperature')).display()

In [0]:
# Groupby on multiple columns
df.groupby('Station Name').agg(max('Air Temperature').alias('Max Temp'), avg('Air Temperature').alias('Avg Temp')).display()

#### Collect List - Group by and store the entity in a single column

In [0]:
data = [('user1', 'book1'),
        ('user1', 'book2'),
        ('user2', 'book1'),
        ('user2', 'book2'),
        ('user2', 'book3'),
        ('user3', 'book1'),
        ('user3', 'book3')]

my_schema = 'user string, book string'

df_book = spark.createDataFrame(data, my_schema)
df_book.display()

In [0]:
df_book.groupby('user').agg(collect_list('book')).display()

### Pivot Tables

In [0]:
cafe_data = [('Mumbai', 'Kurla', 500),
        ('Mumbai', 'Dadat', 500),
        ('Mumbai', 'BKC', 200),
        ('Delhi', 'NCR', 1000),
        ('Delhi', 'DKC', 300),
        ('Kolkata', 'Babumushai', 400),
        ('Kolkata', 'Huwda', 500)]

my_schema = 'city string, place string, capacity integer'

df_cafe = spark.createDataFrame(cafe_data, my_schema)
df_cafe.display()

In [0]:
df_cafe.groupBy('city').pivot('place').agg(avg('capacity')).display()

### When-Otherwise

In [0]:
# Create a new column named "Temperature" which displays high, low depending on the Air Temperature value
df.withColumn('Temperature', when(col('Air Temperature')>20, 'high').otherwise('low')).display()

### Joins
inner, left, right, full <br>
Anti join - Records which doesn't have any mapping

In [0]:
df1.join(df2, df1['id'] == df2['id'], 'inner')

### Examples for performing join operations
Two tables :- User and Order

In [0]:
user_data = [
  (1, 'Alice', 25, 'New York'),
(2, 'Bob', 30, 'Los Angeles'),
(3, 'Charlie', 28, 'Chicago'),
(4, 'Dana', 22, 'Houston'),
(5, 'Evan', 35, 'Phoenix'),
(6, 'Fiona', 27, 'San Francisco'),
(7, 'George', 31, 'Seattle'),
(8, 'Hannah', 24, 'Boston'),
(9, 'Ian', 29, 'Denver'),
(10, 'Judy', 33, 'Atlanta'),
(11, 'Kevin', 40, 'Dallas'),
(12, 'Laura', 26, 'San Diego'),
(13, 'Mike', 34, 'Austin'),
(14, 'Nina', 23, 'San Jose'),
(15, 'Oliver', 36, 'Las Vegas'),
(16, 'Paula', 38, 'Miami'),
(17, 'Quinn', 29, 'Orlando'),
(18, 'Rachel', 25, 'Portland'),
(19, 'Sam', 37, 'Salt Lake City'),
(20, 'Tina', 32, 'Philadelphia'),
(21, 'Uma', 27, 'Baltimore'),
(22, 'Victor', 41, 'Charlotte'),
(23, 'Wendy', 30, 'Columbus'),
(24, 'Xander', 28, 'Indianapolis'),
(25, 'Yara', 35, 'Memphis'),
(26, 'Zane', 26, 'Nashville'),
(27, 'Alison', 22, 'Cincinnati'),
(28, 'Brad', 39, 'Kansas City'),
(29, 'Cindy', 21, 'Oklahoma City'),
(30, 'Derek', 33, 'New Orleans'),
(31, 'Elaine', 34, 'Louisville'),
(32, 'Frank', 40, 'Minneapolis'),
(33, 'Gloria', 29, 'Tampa'),
(34, 'Harry', 32, 'Pittsburgh'),
(35, 'Isla', 31, 'Cleveland'),
(36, 'Jack', 28, 'Sacramento'),
(37, 'Karen', 25, 'Albuquerque'),
(38, 'Leon', 30, 'Milwaukee'),
(39, 'Megan', 27, 'Tucson'),
(40, 'Nathan', 36, 'Fresno'),
(41, 'Olivia', 38, 'Mesa'),
(42, 'Peter', 34, 'Sacramento'),
(43, 'Quincy', 35, 'Virginia Beach'),
(44, 'Rose', 26, 'Oakland'),
(45, 'Steve', 39, 'Long Beach'),
(46, 'Tara', 40, 'St. Louis'),
(47, 'Ursula', 28, 'Raleigh'),
(48, 'Vince', 37, 'Anchorage'),
(49, 'Wes', 33, 'Santa Ana'),
(50, 'Xena', 29, 'Bakersfield')
]

user_schema = 'user_id integer, name string, age integer, city string'

users_df = spark.createDataFrame(user_data, user_schema)

In [0]:
new_user_data = [(46, 'R', 25, 'New York')]

new_user_df = spark.createDataFrame(new_user_data, user_schema)
users_df = users_df.union(new_user_df)
users_df.display()

In [0]:
orders_data = [
    (1, 1, 'January', 100.50),
(2, 1, 'February', 75.00),
(3, 2, 'January', 200.00),
(4, 3, 'March', 150.00),
(5, 4, 'April', 125.00),
(6, 5, 'May', 300.00),
(7, 6, 'June', 250.00),
(8, 7, 'July', 175.50),
(9, 8, 'August', 120.75),
(10, 9, 'September', 180.00),
(11, 10, 'October', 220.00),
(12, 11, 'November', 400.00),
(13, 12, 'December', 350.00),
(14, 13, 'January', 95.00),
(15, 14, 'February', 135.00),
(16, 15, 'March', 80.00),
(17, 16, 'April', 190.00),
(18, 17, 'May', 240.00),
(19, 18, 'June', 105.00),
(20, 19, 'July', 300.00),
(21, 20, 'August', 260.00),
(22, 21, 'September', 110.00),
(23, 22, 'October', 210.00),
(24, 23, 'November', 280.00),
(25, 24, 'December', 390.00),
(26, 25, 'January', 50.00),
(27, 26, 'February', 60.00),
(28, 27, 'March', 75.00),
(29, 28, 'April', 85.00),
(30, 29, 'May', 95.00),
(31, 30, 'June', 105.00),
(32, 31, 'July', 115.00),
(33, 32, 'August', 125.00),
(34, 33, 'September', 135.00),
(35, 34, 'October', 145.00),
(36, 35, 'November', 155.00),
(37, 36, 'December', 165.00),
(38, 37, 'January', 175.00),
(39, 38, 'February', 185.00),
(40, 39, 'March', 195.00),
(41, 40, 'April', 205.00),
(42, 41, 'May', 215.00),
(43, 42, 'June', 225.00),
(44, 43, 'July', 235.00),
(45, 44, 'August', 245.00),
(46, 45, 'September', 255.00),
(47, 46, 'October', 265.00),
(48, 47, 'November', 275.00),
(49, 48, 'December', 285.00),
(50, 49, 'January', 295.00)
]

orders_schema = 'order_id integer, user_id integer, order_month string, order_amount double'

orders_df = spark.createDataFrame(orders_data, orders_schema)

#### Exercises
1. Find the names of users and their respective order amounts for all users who have placed at least one order.

In [0]:
users_df.join(orders_df, users_df['user_id'] == orders_df['user_id'], 'inner').display()

In [0]:
users_df.join(orders_df, users_df['user_id'] == orders_df['user_id'], 'left').display()

SQL Query <br>
SELECT user.name, order.order_amount <br>
FROM users as user <br>
INNER JOIN orders as order <br>
ON user.user_id = order.user_id <br>


In [0]:
# Find the names of users and their respective order amounts for all users who have placed at least one order.
users_df.join(orders_df, users_df['user_id'] == orders_df['user_id'], 'inner') \
    .select(users_df['name'], orders_df['order_id'], orders_df['order_amount']) \
    .display()

2. List all users along with their order amounts. If a user hasn’t placed any orders, display NULL for the order amount. <br>
SELECT u.name, o.order_amount <br>
FROM user AS u<br>
LEFT JOIN order AS o<br>
ON u.user_id = o.user_id<br>

In [0]:
# List all users along with their order amounts. If a user hasn’t placed any orders, display NULL for the order amount.
users_df.join(orders_df, users_df['user_id'] == orders_df['user_id'], 'left') \
    .select(users_df['name'], orders_df['order_id'], orders_df['order_amount']) \
    .display()

3. Find pairs of users who live in the same city. Exclude pairs where the user IDs are the same. <br>
SELECT u1.name, u2.name, u1.city <br>
FROM user AS u1 <br>
JOIN user AS u2 <br>
ON u1.city = u2.city AND u1.user_id != u2.user_id <br>

In [0]:
users_df.alias('u1').join(users_df.alias('u2'), (col('u1.city') == col('u2.city')) \
                & (col('u1.user_id') != col('u2.user_id')), 'inner') \
                .select(col('u1.name'), col('u2.name'), col('u1.city')) \
                .display()

4. Get the total amount spend by the user <br>
SELECT u.name, o.total_order_amount <br>
FROM user AS u<br>
JOIN (<br>
  SELECT user_id, SUM(order_amount) AS total_order_amount <br>
  FROM orders<br>
  GROUP BY user_id<br>
) AS o<br>
ON u.user_id = o.user_id

In [0]:
users_df.alias('u').join(
                orders_df.groupBy(col('user_id')).agg(sum(col('order_amount')).alias('total_amount')).select(col('user_id'), col('total_amount')).alias('o'), col('u.user_id') == col('o.user_id'), 'inner').select(col('u.name'), col('o.total_amount')).display()

### Window Functions
Window functions are the special functions which perform row-level calculation.
Popular - row number, rank, dense rank <br>
Row number :- Assigns a unique number to unique rows (usecase - remove duplicates, create surrogate keys)

In [0]:
from pyspark.sql.window import Window

In [0]:
users_df.withColumn('row_number', \
    row_number().over(Window.orderBy('city'))\
    ).display()

In [0]:
# Age ke basis pe rank dena hai descending order
users_df.withColumn('row rank', rank().over(Window.orderBy(col('age').desc())))\
    .withColumn('row dense rank', dense_rank().over(Window.orderBy(col('age').desc()))).display()

##### cumulative sum
frame function --- inside the over function

In [0]:
orders_df.withColumn('cumsum', sum('order_amount').over(Window.orderBy('user_id'))).display()

In [0]:
orders_df.withColumn('cumsum', sum('order_amount').over(Window.orderBy('user_id').rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

In [0]:
orders_schema = 'order_id integer, user_id integer, order_month string, order_amount double'

new_order_data = [
(52, 7, 'July', 145.50),
(65, 8, 'August', 170.75),
(66, 9, 'September', 180.00),
(67, 10, 'October', 220.00),
(68, 11, 'November', 440.00),
(53, 12, 'December', 350.00),
(54, 13, 'January', 915.00),
(55, 14, 'February', 105.00),
(56, 15, 'March', 180.00),
(57, 16, 'April', 290.00),
(58, 17, 'May', 240.00),
(59, 18, 'June', 105.00),
(60, 19, 'July', 300.00),
(61, 20, 'August', 240.00),
(62, 21, 'September', 210.00),
(63, 22, 'October', 290.00),
(64, 23, 'November', 980.00)
]

new_orders_df = spark.createDataFrame(new_order_data, orders_schema)
orders_df = orders_df.union(new_orders_df)
orders_df.display()

In [0]:
orders_df.withColumn('Cumulative Sum', sum('order_amount').over(Window.orderBy('user_id').rowsBetween(Window.unboundedPreceding, Window.currentRow))).display()

### User Defined Functions (UDF)
custom-transformation with python

In [0]:
def my_func(x):
  return x*x

In [0]:
my_udf = udf(my_func)

In [0]:
orders_df.withColumn('Round Amount', my_udf('order_amount')).display()

In [0]:
# test