In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from datetime import datetime

spark = SparkSession.builder.appName("CountryRankingChange").getOrCreate()

# fb_actual_users
user_data = [
    (1, "Alice", "active", "USA"),
    (2, "Bob", "active", "India"),
    (3, "Charlie", "inactive", "USA"),
    (4, "David", "active", "UK"),
    (5, "Eve", "active", "India"),
    (6, "Frank", "active", "UK"),
    (7, "Grace", "active", "Canada")
]

user_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("status", StringType(), True),
    StructField("country", StringType(), True)
])

fb_actual_users = spark.createDataFrame(user_data, schema=user_schema)

# fb_comment_counts
comment_data = [
    (1, datetime(2021, 12, 5, 10), 5),
    (2, datetime(2021, 12, 10, 12), 8),
    (3, datetime(2021, 12, 15, 14), 2),
    (4, datetime(2021, 12, 20, 16), 3),
    (7, datetime(2021, 12, 15, 13), 4),
    (5, datetime(2022, 1, 5, 11), 7),
    (6, datetime(2022, 1, 10, 9), 7),
    (7, datetime(2022, 1, 15, 13), 6),
    (1, datetime(2022, 1, 20, 15), 3)
]

# 2021: india - 8 , usa - 7 , canada - 4, uk - 3

# 2022 : india -7, uk -7   , canada - 6 , usa -3 
# rank: 1,1,3,4
#dense_rank: 1,1,2,3



comment_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("created_dt", TimestampType(), True),
    StructField("no_comments", IntegerType(), True)
])

fb_comment_counts = spark.createDataFrame(comment_data, schema=comment_schema)


25/05/30 13:17:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
# Which countries have risen in rankings based on the number 
# of comments between December 2021 vs jan 2022(Avoid gaps between ranks)

from pyspark.sql.functions import col,sum, dense_rank
from pyspark.sql.window import Window

fb_actual_users.createOrReplaceTempView('users')

fb_comment_counts.createOrReplaceTempView('counts')

spark.sql("""
          with deck_2021 as (
          select country, sum(no_comments) as sum1, dense_rank() over (order By sum(no_comments) desc) as rank 
          from users u join counts c on u.user_id = c.user_id 
          where c.created_dt between date'2021-12-01' and date'2021-12-31'
          group by 1 
          ),
         deck_2022(
          select country, sum(no_comments) as sum1, dense_rank() over (order By sum(no_comments) desc) as rank 
          from users u join counts c on u.user_id = c.user_id 
          where c.created_dt between date'2022-01-01' and date'2022-01-31'
          group by 1 
          )

          select d2.country,d2.sum1  from deck_2021 d1 
          join deck_2022 d2
          on d1.country = d2.country 
          and d2.rank < d1.rank
          
          """)
# .show()

window_spec = Window.orderBy(sum(col('no_comments')).desc())
deck_2021 = fb_actual_users.join(fb_comment_counts,'user_id',how='left') \
    .filter("created_dt between date'2021-12-01' and date'2021-12-31'") \
        .groupBy(col('country')) \
        .agg(dense_rank().over(window_spec).alias("rank21")
)

deck_2022 = fb_actual_users.join(fb_comment_counts,'user_id',how='left') \
    .filter("created_dt between date'2022-01-01' and date'2022-01-31'") \
        .groupBy(col('country')) \
        .agg(dense_rank().over(window_spec).alias("rank22")
)

deck_2021.show()
deck_2022.show()


deck_2022.join(deck_2021, "country", how='inner').filter("rank22 < rank21").select("country").show()


25/05/30 13:17:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 1

+-------+------+
|country|rank21|
+-------+------+
|  India|     1|
|    USA|     2|
| Canada|     3|
|     UK|     4|
+-------+------+



25/05/30 13:17:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 1

+-------+------+
|country|rank22|
+-------+------+
|  India|     1|
|     UK|     1|
| Canada|     2|
|    USA|     3|
+-------+------+



25/05/30 13:17:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 13:17:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/30 1

+-------+
|country|
+-------+
| Canada|
|     UK|
+-------+



25/05/30 19:14:36 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 893800 ms exceeds timeout 120000 ms
25/05/30 19:14:36 WARN SparkContext: Killing executors is not supported by current scheduler.
25/05/30 19:14:37 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [None]:
You are given two strings word1 and word2. Merge the strings by adding letters in alternating order, starting with word1. 
If a string is longer than the other, append the additional letters onto the end of the merged string.

Return the merged string.

 

Example 1:

Input: word1 = "abc", word2 = "pqr"
Output: "apbqcr"
Explanation: The merged string will be merged as so:
word1:  a   b   c
word2:    p   q   r
merged: a p b q c r
Example 2:

Input: word1 = "ab", word2 = "pqrs"
Output: "apbqrs"
Explanation: Notice that as word2 is longer, "rs" is appended to the end.
word1:  a   b 
word2:    p   q   r   s
merged: a p b q   r   s
Example 3:

Input: word1 = "abcd", word2 = "pq"
Output: "apbqcd"
Explanation: Notice that as word1 is longer, "cd" is appended to the end.
word1:  a   b   c   d
word2:    p   q 
merged: a p b q c   d

In [None]:
str1 = 'abcqwertyui'
str2 = 'pqrs' 


res = ''
for i in range(1,max(len(str2),len(str1))+1):
    # print(i)
    res = res+str1[:1]+str2[:1]
    # print(res)
    str2 = str2[1:]
    str1 = str1[1:]
    # print(str1,str2)

res
# str1[:1]

'apbqcrqswertyui'

<h1>------ pyspark interview questions ---- <h2>

In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create SparkSession
spark = SparkSession.builder \
    .appName("Create Sales DataFrame") \
    .getOrCreate()

# Sample data
data = [
    ("East", "A", 100),
    ("East", "B", 150),
    ("East", "C", 120),
    ("West", "A", 200),
    ("West", "B", 180),
    ("West", "C", 210)
]

# Define schema
schema = StructType([
    StructField("region", StringType(), True),
    StructField("product", StringType(), True),
    StructField("revenue", IntegerType(), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show the DataFrame
df.show()


25/05/20 16:46:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 349:>                                                        (0 + 3) / 3]

+------+-------+-------+
|region|product|revenue|
+------+-------+-------+
|  East|      A|    100|
|  East|      B|    150|
|  East|      C|    120|
|  West|      A|    200|
|  West|      B|    180|
|  West|      C|    210|
+------+-------+-------+



                                                                                

In [6]:
# Scenario:
# Find employees whose salary falls within the top 20% highest salaries in their respective departments.

# | emp_id | name    | department | salary |
# |--------|--------|-----------|--------|
# | 4      | David  | IT        | 4500   |
# | 5      | Eva    | Finance   | 5000   |
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, count, col, dense_rank,ceil
# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    ("John", "HR", 50000),
    ("Alice", "HR", 60000),
    ("Bob", "HR", 55000),
    ("Charlie", "HR", 80000),
    ("David", "HR", 75000),
    ("Eve", "Engineering", 90000),
    ("Frank", "Engineering", 110000),
    ("Grace", "Engineering", 95000),
    ("Heidi", "Engineering", 120000),
    ("Ivan", "Engineering", 100000),
    ("Judy", "Engineering", 98000),
    ("Kevin", "Sales", 45000),
    ("Laura", "Sales", 47000),
    ("Mallory", "Sales", 60000),
    ("Niaj", "Sales", 65000),
    ("Olivia", "Sales", 70000),
    ("John1", "HR", 55000),
    ("Alice1", "HR", 65000),
    ("Bob1", "HR", 56000),
    ("Charlie1", "HR", 85000),
    ("David1", "HR", 79000),
]

# Schema
schema = StructType([
    StructField("employee_name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True),
])

# Create DataFrame
employees_df = spark.createDataFrame(data, schema)

# Show DataFrame
employees_df.show()


window_spec = Window.partitionBy(col('department')).orderBy(col("salary").desc())

window_spec_sum = Window.partitionBy(col('department'))


df = employees_df.withColumn('rank',dense_rank().over(window_spec)) \
    .withColumn('dept_total',count('*').over(window_spec_sum)) \
    .filter(col('rank') <= ceil(col('dept_total')* 0.2))

df.show()

# git config --global user.name "hari"

# git config --global user.email "kishorehari228@gmail.com"

                                                                                

+-------------+-----------+------+
|employee_name| department|salary|
+-------------+-----------+------+
|         John|         HR| 50000|
|        Alice|         HR| 60000|
|          Bob|         HR| 55000|
|      Charlie|         HR| 80000|
|        David|         HR| 75000|
|          Eve|Engineering| 90000|
|        Frank|Engineering|110000|
|        Grace|Engineering| 95000|
|        Heidi|Engineering|120000|
|         Ivan|Engineering|100000|
|         Judy|Engineering| 98000|
|        Kevin|      Sales| 45000|
|        Laura|      Sales| 47000|
|      Mallory|      Sales| 60000|
|         Niaj|      Sales| 65000|
|       Olivia|      Sales| 70000|
|        John1|         HR| 55000|
|       Alice1|         HR| 65000|
|         Bob1|         HR| 56000|
|     Charlie1|         HR| 85000|
+-------------+-----------+------+
only showing top 20 rows





+-------------+-----------+------+----+----------+
|employee_name| department|salary|rank|dept_total|
+-------------+-----------+------+----+----------+
|        Heidi|Engineering|120000|   1|         6|
|        Frank|Engineering|110000|   2|         6|
|     Charlie1|         HR| 85000|   1|        10|
|      Charlie|         HR| 80000|   2|        10|
|       Olivia|      Sales| 70000|   1|         5|
+-------------+-----------+------+----+----------+



                                                                                

In [None]:
# 1. reverse a string
str1 = ['h','e','l','l','o']

str1[::-1]

# 2. highest non repetating no :
num = [4,5,1,2,5,4,3,8,3,8,2,1]
freq = {}

for i in num: 
    if i in freq:
        freq[i] += 1
    else:
        freq[i] = 1
freq
    
non_repetating = []
for i in num:
    if freq[i] == 1:
        non_repetating.append(i)
if non_repetating:
    print(max(non_repetating))
else:
    print(None)

# 3. mask the string asdfghjkl to as*****kl

str1 = 'asdfghjkl'

print(str1[:2]+'*'* (len(str1) - 4)+str1[-2:])
# ,'*'* len(str1) - 4 , str[:-2]
# type(len(str1))



None
as*****kl


In [5]:
# 4. sql highest 3 
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum,col, dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Start Spark session
spark = SparkSession.builder.appName("TopProducts").getOrCreate()

# Sample data
data = [
    (1, 201, 10),
    (2, 202, 15),
    (3, 203, 20),
    (4, 201, 5),
    (5, 202, 10),
    (6, 204, 30)
]

# Define schema
columns = ["order_id", "product_id", "quantity"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.createOrReplaceTempView('orders')

spark.sql('''
select product_id, sum(quantity) from orders group by product_id order by 2 desc limit 3
''').show()

window_spec = Window.orderBy(sum(col('quantity')).desc())

df.groupBy("product_id").agg(sum('quantity').alias("sum"),dense_rank().over(window_spec).alias("rank")).filter(col("rank")<= 3).show()



+----------+-------------+
|product_id|sum(quantity)|
+----------+-------------+
|       204|           30|
|       202|           25|
|       203|           20|
+----------+-------------+



25/06/20 09:51:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/20 09:51:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/20 09:51:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/20 09:51:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/20 09:51:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/20 09:51:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/20 0

+----------+---+----+
|product_id|sum|rank|
+----------+---+----+
|       204| 30|   1|
|       202| 25|   2|
|       203| 20|   3|
+----------+---+----+



In [None]:
# print the numbers which is repeating n times in a row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, lead
from pyspark.sql.window import Window

# Start Spark session
spark = SparkSession.builder.appName("ConsecutiveNums").getOrCreate()

# Sample data
data = [
    (1, '1'), 1 , 0
    (2, '1'), 2 , 0 3
    (3, '1'), 3 , 0
    (4, '2'), 1 , 3 1
    (5, '1'), 4 , 1
    (6, '1'), 5 , 1 3
    (7, '1') 6 ,  1
]

# Define schema
columns = ["id", "num"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

df.createOrReplaceTempView('numbers')


spark.sql("""
          select num from (
select distinct num ,lag(num,1) over(order by id) as prev,lead(num,1) over( order by id ) as next from numbers 
          ) where num = prev = next
""").show()

query = f"""
WITH numbered AS (
    SELECT
        id,
        num,
        ROW_NUMBER() OVER (PARTITION BY num ORDER BY id) as rn
    FROM numbers
),
grouped AS (
    SELECT
        id,
        num,
        id - rn as grp
    FROM numbered
),
streaks AS (
    SELECT
        num,
        COUNT(*) as streak_len,
        MIN(id) as start_id,
        MAX(id) as end_id
    FROM grouped
    GROUP BY num, grp
)
SELECT *
FROM streaks
WHERE streak_len >= {n}
"""


from pyspark.sql.window import Window

# Define window spec
window_spec = Window.partitionBy("id").orderBy("date")

# Add lead and lag columns
df_with_lag_lead = df.withColumn("prev_value", lag("value").over(window_spec)) \
                     .withColumn("next_value", lead("value").over(window_spec))

df_with_lag_lead.show()



25/07/08 19:58:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/07/08 19:58:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/08 19:58:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/08 19:58:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/07/08 19:58:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+
|num|
+---+
|  1|
+---+



                                                                                

NameError: name 'n' is not defined

25/07/09 09:32:47 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 37057713 ms exceeds timeout 120000 ms
25/07/09 09:32:48 WARN SparkContext: Killing executors is not supported by current scheduler.
25/07/09 09:32:48 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, lead
from pyspark.sql.window import Window

# Start Spark session
spark = SparkSession.builder.appName("employee").getOrCreate()

# Sample data
data = [
    (1, '100'),
    (2, '200'),
    (3, '300')

]

# Define schema
columns = ["id", "salary"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

df.createOrReplaceTempView('employee')

spark.sql("""
          select coalesce(max(salary), Null) from (select salary from 
(select id, salary, rank() over (order by salary desc) as rank from employee )where rank = 2 )
          """).show()


25/05/28 09:32:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/28 09:32:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------------------------+
|coalesce(max(salary), NULL)|
+---------------------------+
|                        200|
+---------------------------+



25/05/28 09:32:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/28 09:32:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [None]:


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, lead
from pyspark.sql.window import Window

# Start Spark session
spark = SparkSession.builder.appName("employee").getOrCreate()

# Sample data
data = [
    (1, '100'),
    (2, '200'),
    (3, '300'),
    (4, '300')

]

# Define schema
columns = ["id", "score"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

df.createOrReplaceTempView('scores')

spark.sql("""
          select score, dense_rank() over (order by score desc) from scores
          """).show()

ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
# Question: Employee Salaries and Department Analysis
# You are given three DataFrames:
# employees
#  | emp_id | emp_name | dept_id | hire_date |
#  |--------|----------|---------|-----------|
#  | 1 | Alice | 10 | 2020-01-15|
#  | 2 | Bob | 20 | 2019-03-10|
#  | 3 | Charlie | 10 | 2021-07-23|
#  | 4 | David | 30 | 2018-06-01|
#  | 5 | Eva | 10 | 2020-11-12|
# departments
#  | dept_id | dept_name |
#  |---------|-------------|
#  | 10 | HR |
#  | 20 | IT |
#  | 30 | Finance |
# salaries
#  | emp_id | salary | effective_date |
#  |--------|--------|----------------|
#  | 1 | 70000 | 2021-01-01 |
#  | 1 | 75000 | 2022-01-01 |
#  | 2 | 90000 | 2020-01-01 |
#  | 3 | 60000 | 2022-01-01 |
#  | 5 | 65000 | 2021-01-01 |

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.sql.types import DateType,IntegerType,StringType,StructType,StructField
from datetime import datetime


def to_date(str1):
    return datetime.strptime(str1, "%Y-%m-%d").date()


spark = SparkSession.builder.appName("hari").getOrCreate()

employees_data = [
    (1, "Alice", 10, to_date("2020-01-15")),
    (2, "Bob", 20, to_date("2019-03-10")),
    (3, "Charlie", 10, to_date("2021-07-23")),
    (4, "David", 30, to_date("2018-06-01")),
    (5, "Eva", 10, to_date("2020-11-12"))
]

employees_schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("emp_name", StringType(), True),
    StructField("dept_id", IntegerType(), True),
    StructField("hire_date", DateType(), True)
])

employees_df = spark.createDataFrame(employees_data, schema=employees_schema)

# -------------------------------
# 2. DEPARTMENTS
# -------------------------------
departments_data = [
    (10, "HR"),
    (20, "IT"),
    (30, "Finance")
]

departments_schema = StructType([
    StructField("dept_id", IntegerType(), True),
    StructField("dept_name", StringType(), True)
])

departments_df = spark.createDataFrame(departments_data, schema=departments_schema)

# -------------------------------
# 3. SALARIES
# -------------------------------
salaries_data = [
    (1, 70000, to_date("2021-01-01")),
    (1, 75000, to_date("2022-01-01")),
    (2, 90000, to_date("2020-01-01")),
    (3, 60000, to_date("2022-01-01")),
    (5, 65000, to_date("2021-01-01"))
]

salaries_schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("salary", IntegerType(), True),
    StructField("effective_date", DateType(), True)
])

salaries_df = spark.createDataFrame(salaries_data, schema=salaries_schema)


# Write PySpark code to:
# Get each employee’s latest salary based on effective_date.
# Join with department info.
# For each department, compute:
# Number of employees
# Average salary
# Name of the highest paid employee in that department

from pyspark.sql.window import Window

window_spec = Window.partitionBy("emp_id").orderBy(col("effective_date").desc())
window_salary = Window.partitionBy("dept_id").orderBy(col("salary").desc())


salaries_df = salaries_df.withColumn("rank", dense_rank().over(window_spec)).filter(col("rank")== lit(1))

employees_df.join(departments_df,on="dept_id", how="left").join(salaries_df, on="emp_id",how="left")\
.groupBy("dept_name").agg(count(col('emp_id').alias("count_n")),avg(col('salary').alias("avg_sal"))).show()

employees_df.join(departments_df,on="dept_id", how="left").join(salaries_df, on="emp_id",how="left")\
.withColumn("sal_rank", dense_rank().over(window_salary)).filter("sal_rank = 1") \
    .fillna({'salary':1000}) \
.withColumn("bucket", when(col("salary")>= 100000, "high")
            .when((col("salary") >= 50000) & (col("salary") < 100000), "medium").otherwise("low")).show()



                                                                                

+---------+------------------------+----------------------+
|dept_name|count(emp_id AS count_n)|avg(salary AS avg_sal)|
+---------+------------------------+----------------------+
|       HR|                       3|     66666.66666666667|
|  Finance|                       1|                  NULL|
|       IT|                       1|               90000.0|
+---------+------------------------+----------------------+



                                                                                

+------+-------+--------+----------+---------+------+--------------+----+--------+------+
|emp_id|dept_id|emp_name| hire_date|dept_name|salary|effective_date|rank|sal_rank|bucket|
+------+-------+--------+----------+---------+------+--------------+----+--------+------+
|     1|     10|   Alice|2020-01-15|       HR| 75000|    2022-01-01|   1|       1|medium|
|     2|     20|     Bob|2019-03-10|       IT| 90000|    2020-01-01|   1|       1|medium|
|     4|     30|   David|2018-06-01|  Finance|  1000|          NULL|NULL|       1|   low|
+------+-------+--------+----------+---------+------+--------------+----+--------+------+



25/06/07 08:45:28 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 888637 ms exceeds timeout 120000 ms
25/06/07 08:45:28 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/07 08:45:35 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [13]:
# Spark Interview Question – Real-World Problem

# 🚀 Problem Statement:
#  You're given two DataFrames:
# employees – contains employee details:
#  (employee_id, name, salary, department_id)
# departments – contains department metadata:
#  (department_id, department_name)
# 🎯 Objective:
#  Fill in the missing (null) salaries in the employees DataFrame with the average salary of that respective department.

# employees 
# +-----------+-------+------+-------------+
# |employee_id| name|salary|department_id|
# +-----------+-------+------+-------------+
# | 1| Alice| 70000| 10|
# | 2| Bob| NULL| 20|
# | 3|Charlie| 80000| 10|
# | 4| David| NULL| 30|
# | 5| Eve| 75000| 20|
# | 6| Frank| 90000| 10|
# | 7| Grace| 52000| 30|
# | 8| Hannah| 62000| 20|
# | 9| Isaac| NULL| 30|
# | 10| Jack| 71000| 20|
# +-----------+-------+------+-------------+

# departments :
# +-------------+---------------+
# |department_id|department_name|
# +-------------+---------------+
# | 10| Engineering|
# | 20| HR|
# | 30| Marketing|
# +-------------+---------------+

# ✅ Solution in PySpark:

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("FillNullSalaries").getOrCreate()

# Data
employee_data = [
 (1, "Alice", 70000, 10),
 (2, "Bob", None, 20),
 (3, "Charlie", 80000, 10),
 (4, "David", None, 30),
 (5, "Eve", 75000, 20),
 (6, "Frank", 90000, 10),
 (7, "Grace", 52000, 30),
 (8, "Hannah", 62000, 20),
 (9, "Isaac", None, 30),
 (10, "Jack", 71000, 20)
]

department_data = [
 (10, "Engineering"),
 (20, "HR"),
 (30, "Marketing")
]


employees_df = spark.createDataFrame(employee_data, schema=["employee_id", "name", "salary", "department_id"])
departments_df = spark.createDataFrame(department_data, ["department_id", "department_name"])


dept_avg_df = employees_df.groupBy("department_id").agg(F.round(F.avg('salary'), 1).alias("avg_sal"))


employees_df.join(dept_avg_df , dept_avg_df.department_id == employees_df.department_id, how = 'left')\
.withColumn("salary", F.coalesce(F.col('salary'),F.col("avg_sal")))\
    .drop(dept_avg_df.department_id).show()







[Stage 13:>                                                         (0 + 8) / 8]

+-----------+-------+-------+-------------+-------+
|employee_id|   name| salary|department_id|avg_sal|
+-----------+-------+-------+-------------+-------+
|          1|  Alice|70000.0|           10|80000.0|
|          2|    Bob|69333.3|           20|69333.3|
|          3|Charlie|80000.0|           10|80000.0|
|          4|  David|52000.0|           30|52000.0|
|          5|    Eve|75000.0|           20|69333.3|
|          6|  Frank|90000.0|           10|80000.0|
|          7|  Grace|52000.0|           30|52000.0|
|          8| Hannah|62000.0|           20|69333.3|
|          9|  Isaac|52000.0|           30|52000.0|
|         10|   Jack|71000.0|           20|69333.3|
+-----------+-------+-------+-------------+-------+



                                                                                

In [None]:
# Filter out customers whose names start with the letter 'A' and display all their details using PySpark.

# Sample Data :
data = [
 (1, "Aman Gupta", 28, "Delhi"),
 (2, "Ravi Kumar", 34, "Bangalore"),
 (3, "Anjali Rai", 25, "Mumbai"),
 (4, "Priya Mehta", 30, "Hyderabad"),
 (5, "Arjun Verma", 32, "Pune"),
 (6, "Sneha Jain", 27, "Kolkata"),
 (7, "Ajay Singh", 29, "Lucknow")
]

# Column names
columns = ["CustomerID", "Name", "Age", "City"]

customer_df = spark.createDataFrame(data, columns)


customer_df.filter(F.startswith("Name",F.lit("A"))).show()

+----------+-----------+---+-------+
|CustomerID|       Name|Age|   City|
+----------+-----------+---+-------+
|         1| Aman Gupta| 28|  Delhi|
|         3| Anjali Rai| 25| Mumbai|
|         5|Arjun Verma| 32|   Pune|
|         7| Ajay Singh| 29|Lucknow|
+----------+-----------+---+-------+



25/06/09 09:26:29 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 899524 ms exceeds timeout 120000 ms
25/06/09 09:26:29 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/09 09:26:31 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [None]:
str1 = "asd hdif in a abcd in a this asd"
del1 = ['in','a','this']
# word count :

#python
count = {}
for i in [i for i in str1.split() if i not in del1]:
    c = 0
    for j in str1.split():
        if i==j:
            c+=1
    count.update({i:c})

count



{'asd': 2, 'hdif': 1, 'abcd': 1}

In [4]:
# Step 1: Create SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split
spark = SparkSession.builder.appName("WordCountDF").getOrCreate()

# Step 2: Read the text file into a DataFrame

data = [("asd hdif in a abcd in a this asd",)]

df = spark.createDataFrame(data, ['value'])

# Step 3: Split lines into words
words_df = df.select(explode(split(col("value"), " ")).alias("word"))

# Step 4: Group by word and count
word_count_df = words_df.filter(~col('word').isin(['in','a'])).groupBy("word").count()

# Step 5: Display results
word_count_df.show()




# sql :

# count of characters given in a string 

# df.createOrReplaceTempView("")

spark.sql(""" select length('asd hdif in a abcd in a this asd') - 
          length(replace('asd hdif in a abcd in a this asd','a', ''))
""").show()


spark.sql(""" select word, count(*) from (
          select explode(split('asd hdif in a abcd in a this asd', ' ')) as word) where word not in ('in','a')
          group by word""").show()




+----+-----+
|word|count|
+----+-----+
|abcd|    1|
| asd|    2|
|this|    1|
|hdif|    1|
+----+-----+

+---------------------------------------------------------------------------------------------------+
|(length(asd hdif in a abcd in a this asd) - length(replace(asd hdif in a abcd in a this asd, a, )))|
+---------------------------------------------------------------------------------------------------+
|                                                                                                  5|
+---------------------------------------------------------------------------------------------------+

+----+--------+
|word|count(1)|
+----+--------+
|abcd|       1|
| asd|       2|
|this|       1|
|hdif|       1|
+----+--------+



In [23]:
# consecutive sensor reading greater than 10 in sql

from pyspark.sql.types import StringType,DoubleType,TimestampType,StructField,StructType
from datetime import datetime

data = [
    ("s1", 23.5, datetime(2024, 6, 1, 10, 0)),
    ("s1", 24.0, datetime(2024, 6, 1, 11, 1)),
    ("s1", 23.5, datetime(2024, 6, 1, 12, 0)),
    ("s1", 24.0, datetime(2024, 6, 1, 13, 0)),
    ("s1", 24.0, datetime(2024, 6, 1, 14, 0)),
    ("s2", 30.1, datetime(2024, 6, 1, 10, 0)),
    ("s2", 29.8, datetime(2024, 6, 1, 11, 0)),
    ("s3", 19.6, datetime(2024, 6, 1, 10, 0)),
]

# Define schema
schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("value", DoubleType(), True),
    StructField("time", TimestampType(), True),
])


df = spark.createDataFrame(data, schema)

df.createOrReplaceTempView("sensors")


spark.sql("""select * from sensors order by time""").show()

spark.sql(""" 
 WITH ordered AS (
  SELECT *,
         ROW_NUMBER() OVER (ORDER BY time) AS rn
  FROM sensors
),

grouped AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY sensor_id ORDER BY time) AS sensor_rn
  FROM ordered
),

gaps_and_islands AS (
  SELECT *,
         rn - sensor_rn AS group_id
  FROM grouped
)
          select * from grouped order by rn""").show()

spark.sql(""" 
 WITH ordered AS (
  SELECT *,
         ROW_NUMBER() OVER (ORDER BY time) AS rn
  FROM sensors
),

grouped AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY sensor_id ORDER BY time) AS sensor_rn
  FROM ordered
),

gaps_and_islands AS (
  SELECT *,
         rn - sensor_rn AS group_id
  FROM grouped
),

streaks AS (
  SELECT
    sensor_id,
    group_id,
    COUNT(*) AS consecutive_streak
  FROM gaps_and_islands
  GROUP BY sensor_id, group_id
)

SELECT 
  sensor_id,
  MAX(consecutive_streak) AS consecutive_streak
FROM streaks
GROUP BY sensor_id;

       



 """).show()

                                                                                

+---------+-----+-------------------+
|sensor_id|value|               time|
+---------+-----+-------------------+
|       s3| 19.6|2024-06-01 10:00:00|
|       s2| 30.1|2024-06-01 10:00:00|
|       s1| 23.5|2024-06-01 10:00:00|
|       s2| 29.8|2024-06-01 11:00:00|
|       s1| 24.0|2024-06-01 11:01:00|
|       s1| 23.5|2024-06-01 12:00:00|
|       s1| 24.0|2024-06-01 13:00:00|
|       s1| 24.0|2024-06-01 14:00:00|
+---------+-----+-------------------+



25/06/09 17:16:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 17:16:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 17:16:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 17:16:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+---------+-----+-------------------+---+---------+
|sensor_id|value|               time| rn|sensor_rn|
+---------+-----+-------------------+---+---------+
|       s1| 23.5|2024-06-01 10:00:00|  1|        1|
|       s2| 30.1|2024-06-01 10:00:00|  2|        1|
|       s3| 19.6|2024-06-01 10:00:00|  3|        1|
|       s2| 29.8|2024-06-01 11:00:00|  4|        2|
|       s1| 24.0|2024-06-01 11:01:00|  5|        2|
|       s1| 23.5|2024-06-01 12:00:00|  6|        3|
|       s1| 24.0|2024-06-01 13:00:00|  7|        4|
|       s1| 24.0|2024-06-01 14:00:00|  8|        5|
+---------+-----+-------------------+---+---------+



25/06/09 17:16:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 17:16:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 17:16:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 17:16:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------+------------------+
|sensor_id|consecutive_streak|
+---------+------------------+
|       s1|                 4|
|       s2|                 1|
|       s3|                 1|
+---------+------------------+



                                                                                

In [4]:
# 𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧
# You are working as a Data Engineer on a big data processing pipeline, 
# where you need to analyze a large dataset from a retail business. This 
# dataset contains transaction information of customers, including a unique 
# transaction ID, customer ID, product ID, transaction amount, and date. The 
# dataset has an index column, but you are required to include this index as a 
# regular column in the DataFrame to perform some further transformations and joins. 
# Your task is to convert the index of the PySpark DataFrame into a column while preserving the dataset's integrity.

# 𝐬𝐜𝐡𝐞𝐦𝐚 
data = [ (1001, 5001, 200, "2025-01-01"), 
(1002, 5002, 450, "2025-01-02"), 
(1003, 5003, 300, "2025-01-03"), 
(1004, 5004, 150, "2025-01-04"), 
(1005, 5005, 500, "2025-01-05"), ] 

# Schema for DataFrame 
columns = ["Customer_ID", "Product_ID", "Amount", "Date"] 


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id


spark = SparkSession.builder.appName("hari_test").getOrCreate()

customer_df = spark.createDataFrame(data, columns)


customer_df.withColumn("Index_Column", monotonically_increasing_id()).show()


+-----------+----------+------+----------+------------+
|Customer_ID|Product_ID|Amount|      Date|Index_Column|
+-----------+----------+------+----------+------------+
|       1001|      5001|   200|2025-01-01|  8589934592|
|       1002|      5002|   450|2025-01-02| 25769803776|
|       1003|      5003|   300|2025-01-03| 34359738368|
|       1004|      5004|   150|2025-01-04| 51539607552|
|       1005|      5005|   500|2025-01-05| 60129542144|
+-----------+----------+------+----------+------------+



In [None]:

person	src	dest
John	BOM	HYD
John	HYD	BOM
Alice	BOM	DEL
Alice	DEL	BOM
Bob	BOM	HYD
Bob	DEL	BOM
John	HYD	DEL
Alice	DEL	HYD
Bob	HYD	BOM
John	BOM	DEL
 
 
 
city1	city2	total_visits
BOM	HYD	4
BOM	DEL	4
DEL	HYD	2

spark.sql("""SELECT 
    CASE 
        WHEN src < dest THEN src 
        ELSE dest 
    END AS city1,
    CASE 
        WHEN src < dest THEN dest 
        ELSE src 
    END AS city2,
    COUNT(*) AS total_visits
FROM travel_data
GROUP BY 
    CASE 
        WHEN src < dest THEN src 
        ELSE dest 
    END,
    CASE 
        WHEN src < dest THEN dest 
        ELSE src 
    END;
""")



from pyspark.sql.functions import col, when, least, greatest, count
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    ("John", "BOM", "HYD"),
    ("John", "HYD", "BOM"),
    ("Alice", "BOM", "DEL"),
    ("Alice", "DEL", "BOM"),
    ("Bob", "BOM", "HYD"),
    ("Bob", "DEL", "BOM"),
    ("John", "HYD", "DEL"),
    ("Alice", "DEL", "HYD"),
    ("Bob", "HYD", "BOM"),
    ("John", "BOM", "DEL")
]

columns = ["person", "src", "dest"]
df = spark.createDataFrame(data, columns)

# Normalize city pairs
df_normalized = df.withColumn("city1", least("src", "dest")) \
                  .withColumn("city2", greatest("src", "dest"))

# Group and count
result = df_normalized.groupBy("city1", "city2").agg(count("*").alias("total_visits"))

result.show()


SyntaxError: invalid syntax (2073563035.py, line 1)

In [4]:
# country and sales
# groupby country and sum(sales)
# 30 distinct values in country
# Australia - 72% of data
# use salting in pyspark

from pyspark.sql.functions import when, rand, concat_ws, col, floor, lit

# Number of salt buckets
salt_factor = 10

# Add salt only to skewed key
df_salted = df.withColumn(
    "country_salted",
    when(col("country") == "Australia",
         concat_ws("_", col("country"), (floor(rand() * salt_factor)).cast("int"))
    ).otherwise(col("country"))
)
# 
from pyspark.sql.functions import sum as _sum

df_partial_agg = df_salted.groupBy("country_salted").agg(
    _sum("sales").alias("partial_sum")
)
# 
from pyspark.sql.functions import split

df_final = df_partial_agg.withColumn(
    "country",
    when(col("country_salted").startswith("Australia"),
         split(col("country_salted"), "_").getItem(0)
    ).otherwise(col("country_salted"))
).groupBy("country").agg(
    _sum("partial_sum").alias("total_sales")
)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `country` cannot be resolved. Did you mean one of the following? [`city`, `id`, `name`].;
'Project [id#28L, name#29, city#30, CASE WHEN ('country = Australia) THEN concat_ws(_, 'country, cast(FLOOR((rand(5255242342487690508) * cast(10 as double))) as int)) ELSE 'country END AS country_salted#51]
+- LogicalRDD [id#28L, name#29, city#30], false


In [2]:
# id, name, city
 
# 100, raman, delhi
# 200, ashish, mumbai, bangalore
# 300, preeti, chennai
 

# 100, raman, delhi
# 200, ashish, mumbai
# 300, preeti, chennai

# in pyspark explode

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, trim

# Create Spark session
spark = SparkSession.builder.appName("ExplodeCities").getOrCreate()

# Sample data
data = [
    (100, "raman", "delhi"),
    (200, "ashish", "mumbai, bangalore"),
    (300, "preeti", "chennai")
]

# Create DataFrame
df = spark.createDataFrame(data, ["id", "name", "city"])

# Split and explode cities into multiple rows
df_exploded = df.withColumn("city", explode(split("city", ","))) \
                .withColumn("city", trim("city"))  # remove any extra spaces

df_exploded.show(truncate=False)


# in pyspark omiting 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, trim

# Create Spark session
spark = SparkSession.builder.appName("FirstCity").getOrCreate()

# Sample data
data = [
    (100, "raman", "delhi"),
    (200, "ashish", "mumbai, bangalore"),
    (300, "preeti", "chennai")
]

# Create DataFrame
df = spark.createDataFrame(data, ["id", "name", "city"])

# Extract the first city from the comma-separated list
df_first_city = df.withColumn("city", trim(split("city", ",").getItem(0)))

df_first_city.show(truncate=False)



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/12 16:50:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---+------+---------+
|id |name  |city     |
+---+------+---------+
|100|raman |delhi    |
|200|ashish|mumbai   |
|200|ashish|bangalore|
|300|preeti|chennai  |
+---+------+---------+



25/06/12 16:50:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+------+-------+
|id |name  |city   |
+---+------+-------+
|100|raman |delhi  |
|200|ashish|mumbai |
|300|preeti|chennai|
+---+------+-------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 51514)
Traceback (most recent call last):
  File "/opt/anaconda3/envs/pe/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/envs/pe/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/envs/pe/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/envs/pe/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/opt/anaconda3/envs/pe/lib/python3.10/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/opt/anaconda3/envs/pe/lib/python3.10/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
  File "/opt/anaconda3/envs/pe/l

In [None]:
# how to forward fill a string for subsiquent null values in sql and pyspark


from pyspark.sql.window import Window
from pyspark.sql.functions import last, col

window_spec = Window.partitionBy("id").orderBy("date").rowsBetween(Window.unboundedPreceding, 0)

df_filled = df.withColumn("status_filled", last("status", ignorenulls=True).over(window_spec))



spark.sql("""SELECT *,
  LAST_VALUE(status IGNORE NULLS) OVER (
    PARTITION BY id ORDER BY date
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) AS status_filled
FROM your_table;""")


In [None]:
# count of not null values from top row to the current row

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as _sum
from pyspark.sql.window import Window

# Create Spark session
spark = SparkSession.builder.appName("NonNullCount").getOrCreate()

# Sample data
data = [(1, "A"), (2, None), (3, "B"), (4, None), (5, "C")]
df = spark.createDataFrame(data, ["id", "value"])

# Define window spec ordered by ID
window_spec = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, 0)

# Add non-null cumulative count column
df_with_count = df.withColumn(
    "non_null_count",
    _sum(when(col("value").isNotNull(), 1).otherwise(0)).over(window_spec)
)

df_with_count.show()


spark.sql("""SELECT 
  id,
  value,
  SUM(CASE WHEN value IS NOT NULL THEN 1 ELSE 0 END)
    OVER (ORDER BY id ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) 
    AS non_null_count
FROM my_table;
""")



In [None]:
# We have been given purchasing activity DF and we need 
# to find out cumulative purchases of each product over 
# time.

from pyspark.sql.types import StructType,StructField,IntegerType,StringType



# Define schema for the DataFrame 
schema = StructType([ 
 StructField("order_id", IntegerType(), True), 
 StructField("product_type", StringType(), True), 
 StructField("quantity", IntegerType(), True), 
 StructField("order_date", StringType(), True), 
]) 
  
# Define data 
# Define data 
data = [ 
 (213824, 'printer', 20, "2022-06-27 "), 
 (212312, 'hair dryer', 5, "2022-06-28 "), 
 (132842, 'printer', 18, "2022-06-28 "), 
 (284730, 'standing lamp', 8, "2022-07-05 ") 
]  
#         Step - 3 : Writing the pyspark code to solve 
# the problem 
order_df=spark.createDataFrame(data,schema) 
order_df.show() 
 
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

window_spec = Window().partitionBy(col('product_type')).orderBy('order_date').rowsBetween(Window.unboundedPreceding,0)


order_df.withColumn("cummulative_count", sum('quantity').over(window_spec)).show()

# sql
select *, sum(quantity).over(
    partition by product_type order by order_date rows between unbound proceedings and current row)
    from products


+--------+-------------+--------+-----------+
|order_id| product_type|quantity| order_date|
+--------+-------------+--------+-----------+
|  213824|      printer|      20|2022-06-27 |
|  212312|   hair dryer|       5|2022-06-28 |
|  132842|      printer|      18|2022-06-28 |
|  284730|standing lamp|       8|2022-07-05 |
+--------+-------------+--------+-----------+

+--------+-------------+--------+-----------+-----------------+
|order_id| product_type|quantity| order_date|cummulative_count|
+--------+-------------+--------+-----------+-----------------+
|  212312|   hair dryer|       5|2022-06-28 |                5|
|  213824|      printer|      20|2022-06-27 |               20|
|  132842|      printer|      18|2022-06-28 |               38|
|  284730|standing lamp|       8|2022-07-05 |                8|
+--------+-------------+--------+-----------+-----------------+



25/06/13 15:23:30 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 559732 ms exceeds timeout 120000 ms
25/06/13 15:23:30 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/13 15:23:33 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [21]:
# 1. Load the following sample dataset into a PySpark DataFrame:
# 2. Perform the following operations:
# a. Replace all NULL values in the Quantity column with 0.
# b. Replace all NULL values in the Price column with the average price of the existing data.
# c. Drop rows where the Product column is NULL.
# d. Fill missing Sales_Date with a default value of '2025-01-01'.
# e. Drop rows where all columns are NULL.


data = [ (1, "Laptop", 10, 50000, "North", "2025-01-01"),
         (2, "Mobile", None, 15000, "South", None), 
         (3, "Tablet", 20, None, "West", "2025-01-03"), 
         (4, "Desktop", 15, 30000, None, "2025-01-04"), 
         (5, None, None, None, "East", "2025-01-05"),
          (6, 'mouse', None, None, "East", "2025-01-10") ]

columns = ["Sales_ID", "Product", "Quantity", "Price", "Region", "Sales_Date"]


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, mean, round


spark = SparkSession.builder.appName('ibm_questions').getOrCreate()

df = spark.createDataFrame(data, columns)

avg_price = df.select(round(mean("price"),0)).collect()[0][0]

df = df.fillna({'quantity':0, 'price':avg_price, 'Sales_Date':'2025-01-01'}).dropna(subset = ['product','region']) \
    .dropna(how='all')

df.show()







+--------+-------+--------+-----+------+----------+
|Sales_ID|Product|Quantity|Price|Region|Sales_Date|
+--------+-------+--------+-----+------+----------+
|       1| Laptop|      10|50000| North|2025-01-01|
|       2| Mobile|       0|15000| South|2025-01-01|
|       3| Tablet|      20|31667|  West|2025-01-03|
|       6|  mouse|       0|31667|  East|2025-01-10|
+--------+-------+--------+-----+------+----------+



In [48]:
# Task:
# ✨ Extract domain and create slug version:
#  ➡️ Replace @gmail.com → gmail, @yahoo.com → yahoo, etc.
# ✨ Derive quarter of join (Q1–Q4) from month.
# ✨ Create active_flag:
#  ➡️ If joined in last 365 days → "Active" else "Inactive"
# ✨Format designation to title case.
# ✨ Rename email to email_address.

data = [("Karthik", "karthik@gmail.com", "2024-12-12", "data Engineer", 85000),
 ("Veer", "veer@yahoo.com", "2022-11-01", "Business Analyst", 65000),
 ("Veena", "veena@company.org", "2021-06-15", "Data Scientist", 105000),
 ("Vinay", "vinay@gmail.com", "2024-01-10", "Intern", 25000),
 ("Vijay", "vijay@hotmail.com", "2020-05-22", "Senior Manager", 125000)]

columns = ["name", "email", "join_date", "designation", "salary"]

from pyspark.sql.functions import quarter, concat,split, lit, trim, when, datediff, current_date, to_date, initcap

df = spark.createDataFrame(data, columns)

df.withColumn('domain', split(trim(split(col('email'),'@').getItem(1)),'\.').getItem(0)) \
.withColumn('quarter',concat(lit('Q'),quarter(col('join_date')))) \
.withColumn('status', when(datediff(current_date(),to_date(col('join_date'))) <= 365,'ACTIVE').otherwise('INACTIVE'))\
.withColumnRenamed('email', 'email_address')\
.withColumn('designation', initcap('designation'))\
.show()

+-------+-----------------+----------+----------------+------+-------+-------+--------+
|   name|    email_address| join_date|     designation|salary| domain|quarter|  status|
+-------+-----------------+----------+----------------+------+-------+-------+--------+
|Karthik|karthik@gmail.com|2024-12-12|   Data Engineer| 85000|  gmail|     Q4|  ACTIVE|
|   Veer|   veer@yahoo.com|2022-11-01|Business Analyst| 65000|  yahoo|     Q4|INACTIVE|
|  Veena|veena@company.org|2021-06-15|  Data Scientist|105000|company|     Q2|INACTIVE|
|  Vinay|  vinay@gmail.com|2024-01-10|          Intern| 25000|  gmail|     Q1|INACTIVE|
|  Vijay|vijay@hotmail.com|2020-05-22|  Senior Manager|125000|hotmail|     Q2|INACTIVE|
+-------+-----------------+----------+----------------+------+-------+-------+--------+



In [57]:
# Write a PySpark self-join query to find the direct reports of each manager. 
# Additionally, extend the logic to find all hierarchical relationships up to any level.

# 𝐬𝐜𝐡𝐞𝐦𝐚 
data = [ (1, "Alice", None), 
        (2, "Bob", 1), 
        (3, "Charlie", 1), 
        (4, "David", 2), 
        (5, "Eva", 2), 
        (6, "Frank", 3), 
        (7, "Grace", 3) ] 

columns = ["employee_id", "employee_name", "manager_id"]


df = spark.createDataFrame(data, columns)

df.alias('e').join(df.alias('m'),col("m.employee_id") == col("e.manager_id"), 'left')\
    .selectExpr("e.employee_name as employee", "m.employee_name as manager_name")\
    .show()

+--------+------------+
|employee|manager_name|
+--------+------------+
|   Alice|        NULL|
|     Bob|       Alice|
| Charlie|       Alice|
|   David|         Bob|
|     Eva|         Bob|
|   Frank|     Charlie|
|   Grace|     Charlie|
+--------+------------+



                                                                                

25/06/20 14:31:23 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 563370 ms exceeds timeout 120000 ms
25/06/20 14:31:24 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/20 14:31:24 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [None]:
# read a json from below 

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, explode
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType



data = [
 {
 "user_id": "U1",
 "name": "Alice",
 "orders": [
 {
 "order_id": "O1001",
 "amount": 250,
 "items": [
 {"item_id": "I1", "product": "Book", "qty": 2},
 {"item_id": "I2", "product": "Pen", "qty": 5}
 ]
 },
 {
 "order_id": "O1002",
 "amount": 300,
 "items": [
 {"item_id": "I3", "product": "Notebook", "qty": 3}
 ]
 }
 ]
 },
 {
 "user_id": "U2",
 "name": "Bob",
 "orders": [
 {
 "order_id": "O2001",
 "amount": 150,
 "items": [
 {"item_id": "I4", "product": "Eraser", "qty": 1}
 ]
 }
 ]
 }
]


items_schema = StructType([
    StructField("item_id",StringType()),
    StructField("product",StringType()),
    StructField("qty",IntegerType())

])

orders_schema = StructType([
    StructField("order_id",StringType()),
    StructField("amount",IntegerType()),
    StructField("items", ArrayType(items_schema))
])

schema_json = StructType([
    StructField("user_id",StringType()),
    StructField("name", StringType()),
    StructField("orders",ArrayType(orders_schema))
])



spark = SparkSession.builder.appName("read_json").getOrCreate()

# df = spark.read.schema(schema_json).json(data)
# spark.read.option("multiline",True).json("/FileStore/tables/sample_dataset.json")

df = spark.createDataFrame(data, schema_json)

df.show(truncate=False)

df.withColumn('order', explode('orders'))\
  .withColumn('items', explode('order.items')) \
  .select("items.item_id").show()
# .select(col("orders.order_id")).show()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/11 19:23:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------+-----+--------------------------------------------------------------------------------+
|user_id|name |orders                                                                          |
+-------+-----+--------------------------------------------------------------------------------+
|U1     |Alice|[{O1001, 250, [{I1, Book, 2}, {I2, Pen, 5}]}, {O1002, 300, [{I3, Notebook, 3}]}]|
|U2     |Bob  |[{O2001, 150, [{I4, Eraser, 1}]}]                                               |
+-------+-----+--------------------------------------------------------------------------------+

+-------+
|item_id|
+-------+
|     I1|
|     I2|
|     I3|
|     I4|
+-------+



25/07/11 20:49:30 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 189599 ms exceeds timeout 120000 ms
25/07/11 20:49:30 WARN SparkContext: Killing executors is not supported by current scheduler.
25/07/11 20:49:33 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o