In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("CountryLogic").getOrCreate()

data = [
    ("Ram", 30, "Russia"),
    ("Radha", 31, "Russia,Norway"),
    ("Kannu", 35, "Norway,Belgium")
]

df = spark.createDataFrame(data, ["Name", "Age", "Country"])

# 1️⃣ Get all unique countries
all_countries = (
    df.select(F.explode(F.split("Country", ",")))
      .distinct()
      .agg(F.collect_set("col").alias("AllCountries"))
)

all_countries.show(truncate=False)

# 2️⃣ Split visited countries
df2 = df.withColumn("VisitedCountryArr", F.split("Country", ","))
df2.show()

# 3️⃣ Cross join to get all countries for comparison
df3 = df2.crossJoin(all_countries)
df3.show(truncate=False)

# 4️⃣ Find non visited countries
result = df3.withColumn(
    "NonVisitedCountry",
    F.array_join(
        F.array_except(F.col("AllCountries"), F.col("VisitedCountryArr")),
        ","
    )
).select(
    "Name",
    F.col("Country").alias("VisitedCountry"),
    "NonVisitedCountry"
)

result.show(truncate=False)

# result.show(truncate=False)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("NoCrossJoin").getOrCreate()

data = [
    ("Ram", 30, "Russia"),
    ("Radha", 31, "Russia,Norway"),
    ("Kannu", 35, "Norway,Belgium")
]

df = spark.createDataFrame(data, ["Name", "Age", "Country"])

# 1️⃣ Get all unique countries as a Python list
all_countries = (
    df.select(F.explode(F.split("Country", ",")))
      .distinct()
      .rdd.flatMap(lambda x: x)
      .collect()
)

# 2️⃣ Create visited & non-visited columns
result = (
    df.withColumn("VisitedArr", F.split("Country", ","))
      .withColumn(
          "NonVisitedCountry",
          F.array_join(
              F.array_except(
                  F.array(*[F.lit(c) for c in all_countries]),
                  F.col("VisitedArr")
              ),
              ","
          )
      )
      .select(
          "Name",
          F.col("Country").alias("VisitedCountry"),
          "NonVisitedCountry"
      )
)

result.show(truncate=False)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, split, explode, collect_set,
    array_except, concat_ws
)
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
data = [
    ("Ram", 30, "Russia"),
    ("Radha", 31, "Russia,Norway"),
    
    ("Kannu", 35, "Norway,Belgium")
]

df = spark.createDataFrame(data, ["Name", "Age", "CountryName"])
df1 = df.withColumn("VisitedCountryArr", split(col("CountryName"), ","))
df_exploded = df1.withColumn("country", explode(col("VisitedCountryArr")))
w = Window.partitionBy()



In [None]:
df_with_all = df_exploded.withColumn(
    "AllCountries",
    collect_set("country").over(w)
)
df_with_all.show(truncate=False)

In [None]:
df2 =df_with_all.dropDuplicates(["Name"])
df2.show(truncate=False)

In [None]:
df2 = df2.withColumn(
    "NonVisitedCountryArr",
    array_except(col("AllCountries"), col("VisitedCountryArr"))
)

df2.select('Name','VisitedCountryArr','NonVisitedCountryArr').show()

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
 
spark = SparkSession.builder.appName("test").getOrCreate()

data =[("Ram",30,"Russia"),
	   ("Radha",31,"Russia,Norway"),
	   ("Kannu",35,"Norway,Belgium")
	   ]
df = spark.createDataFrame(data,["Name","age","Country"])

df = df.withColumnRenamed("Country","visiting_country")

df = df.withColumn("country",F.explode(F.split("visiting_country",',')))

all_countries = df.select(F.explode(F.split("Country", ",")))\
      .distinct()/
      .rdd.flatMap(lambda x: x)/
      .collect()
df = df.withColumn("all_country",F.lit(all_countries))

df = df.dropDuplicates(subset=['Name'])

df = df.withColumn(
    "NonVisitedCountryArr",
    F.array_except(F.col("all_country"), F.split(F.col("visiting_country"),','))
)
df = df.drop("country", "all_country")



In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("test").getOrCreate()

data = [
    ("Ram", 30, "Russia"),
    ("Radha", 31, "Russia,Norway"),
    ("Kannu", 35, "Norway,Belgium")
]

df = spark.createDataFrame(data, ["Name", "age", "Country"])
df = df.withColumnRenamed("Country", "visiting_country")

all_countries = (
    df.select(F.explode(F.split("visiting_country", ",")))
      .distinct()
      .rdd.flatMap(lambda x: x)
      .collect()
)
df = df.withColumn("all_country", F.lit(all_countries))

df = df.withColumn(
    "NonVisitedCountryArr",
    F.array_join(F.array_except(
        F.col("all_country"),
        F.split(F.col("visiting_country"), ",")
    ),',')
)

df = df.drop("age","all_country")
df.show(truncate=False)


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

players_data = [
    ("Sachin-IND", 18694, "93/49"),
    ("Ricky-AUS", 11274, "66/31"),
    ("Lara-WI", 10222, "45/21"),
    ("Rahul-IND", 10355, "95/11"),
    ("Jhonty-SA", 7051, "43/5"),
    ("Hayden-AUS", 8722, "67/19")
]

players_df = spark.createDataFrame(
    players_data,
    ["player", "runs", "50s/100s"]
)

countries_data = [
    ("IND", "India"),
    ("AUS", "Australia"),
    ("WI", "WestIndies"),
    ("SA", "SouthAfrica")
]

countries_df = spark.createDataFrame(
    countries_data,
    ["SRT", "country"]
)


In [None]:
players_df = players_df.withColumn('SRT',F.split(F.col('player'),'-')[1])
players_df = players_df.withColumn('player_name',F.split(F.col('player'),'-')[0])
players_df.show()

In [None]:
players_df = players_df.withColumn('Sum',F.split('50s/100s','/')[0].cast('int')+F.split('50s/100s','/')[1].cast('int'))

In [None]:
join_df = players_df.join(countries_df,on='SRT',how='inner')
# join_df.select('player_name','country','runs','sum').show()
join_df.filter(F.col('sum')>90)\
    .select('player_name','country','runs','sum').show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

data = [
    (100, "IT", 100, "2024-05-12"),
    (200, "IT", 100, "2024-06-12"),
    (100, "FIN", 400, "2024-07-12"),
    (300, "FIN", 500, "2024-07-12"),
    (300, "FIN", 1543, "2024-07-12"),
    (300, "FIN", 1500, "2024-07-12")
]

schema = StructType([
    StructField("empid", IntegerType(), True),
    StructField("dept", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("date", StringType(), True)
])

df = spark.createDataFrame(data, schema)

df1 =df.groupBy('empid')\
  .count()\
  .filter(F.col("count")==1)

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("test").getOrCreate()

data = [('A',1),('A',2),('A',3),
		('B',1),('B',2),('B',3),
		('C',1),('C',2),('C',3)]
columns = ['id','number']

df = spark.createDataFrame(data=data,schema= columns)

df1 = df.groupBy('id').agg(F.collect_list(F.col('number')).alias("numbers"))\
        .withColumn("values",F.array_join(F.col('numbers'),','))\
        .withColumn("new_nums",F.lit(df.id =='A'))\
        .withColumn("condtion",F.when(F.col("new_nums")=='True',"yes").otherwise("no"))\
        .withColumn("convert_list",F.split(F.col('values'),','))
df1.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("window").getOrCreate()


sales_data = [
    {"sale_id": 1, "region": "North", "salesperson": "Arun",   "amount": 5000},
    {"sale_id": 2, "region": "South", "salesperson": "Meena",  "amount": 7000},
    {"sale_id": 3, "region": "East",  "salesperson": "Sachin", "amount": 6500},
    {"sale_id": 4, "region": "North", "salesperson": "Arun",   "amount": 8000},
    {"sale_id": 5, "region": "West",  "salesperson": "Priya",  "amount": 4000},
    {"sale_id": 6, "region": "South", "salesperson": "Meena",  "amount": 9000},
    {"sale_id": 7, "region": "East",  "salesperson": "Sachin", "amount": 7500},
    {"sale_id": 8, "region": "West",  "salesperson": "Priya",  "amount": 6000},
    {"sale_id": 9, "region": "North", "salesperson": "Kiran",  "amount": 3000},
    {"sale_id": 10,"region": "South", "salesperson": "Deepa",  "amount": 5500}
]

df = spark.createDataFrame(data=sales_data)

In [None]:
win = Window.partitionBy('region').orderBy(F.col('amount'))

df.withColumn('rank',F.rank().over(win))\
  .withColumn('dense_rank',F.dense_rank().over(win))\
  .withColumn('row_num',F.row_number().over(win)).show()

In [None]:
def inc_based_region(a,b):
    if a in ("North","South"):
        return b+1000
    else:
        return b+500

In [None]:
my_udf = F.udf(inc_based_region)

In [None]:
df.withColumn("added_amount",my_udf('region','amount')).show()

In [None]:
# =========================================================
# 1. STOP EXISTING SPARK SESSION
# =========================================================
try:
    spark.stop()
except:
    pass


# =========================================================
# 2. IMPORTS
# =========================================================
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T


# =========================================================
# 3. CREATE SPARK SESSION (WINDOWS + JDBC FIX)
# =========================================================
spark = (
    SparkSession.builder
    .appName("Postgres_JDBC_EndToEnd")
    .config("spark.jars", r"C:\spark\postgresql-42.7.3.jar")
    .config("spark.driver.extraClassPath", r"C:\spark\postgresql-42.7.3.jar")
    .config("spark.executor.extraClassPath", r"C:\spark\postgresql-42.7.3.jar")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")


# =========================================================
# 4. CREATE SAMPLE DATAFRAME
# =========================================================
data = [
    (1,'U1','2024-01-01',100),
    (2,'U1','2024-01-05',150),
    (3,'U1','2024-01-10',200),
    (4,'U1','2024-01-15',120),
    (5,'U1','2024-01-20',180),
    (6,'U1','2024-01-25',90),
    (7,'U2','2024-01-03',50),
    (8,'U2','2024-01-15',70),
    (9,'U3','2024-02-01',300),
    (10,'U3','2024-02-05',250),
    (11,'U3','2024-02-10',200),
    (12,'U3','2024-02-15',150),
    (13,'U3','2024-02-18',100),
    (14,'U3','2024-02-20',120)
]

schema = T.StructType([
    T.StructField("order_id", T.IntegerType(), False),
    T.StructField("user_id", T.StringType(), True),
    T.StructField("order_date", T.StringType(), True),
    T.StructField("amount", T.IntegerType(), True)
])

df = spark.createDataFrame(data, schema)

df = df.withColumn("order_date", F.to_date("order_date"))

print("===== DATAFRAME BEFORE WRITE =====")
df.show()
print("Row count:", df.count())


# =========================================================
# 5. POSTGRES JDBC CONFIG
# =========================================================
pg_url = "jdbc:postgresql://localhost:5432/Sachin?ssl=false"

pg_props = {
    "user": "postgres",
    "password": "tiger",
    "driver": "org.postgresql.Driver"
}


# =========================================================
# 6. WRITE TO POSTGRES
# =========================================================
df.write \
    .format("jdbc") \
    .option("url", pg_url) \
    .option("dbtable", "public.orders_int") \
    .option("user", pg_props["user"]) \
    .option("password", pg_props["password"]) \
    .option("driver", pg_props["driver"]) \
    .mode("overwrite") \
    .save()

print("✅ WRITE COMPLETED")


# =========================================================
# 7. READ BACK FROM POSTGRES (PROOF)
# =========================================================
read_df = spark.read \
    .format("jdbc") \
    .option("url", pg_url) \
    .option("dbtable", "public.orders_int") \
    .option("user", pg_props["user"]) \
    .option("password", pg_props["password"]) \
    .option("driver", pg_props["driver"]) \
    .load()

print("===== DATA READ BACK FROM POSTGRES =====")
read_df.show()
print("Row count in Postgres:", read_df.count())


# =========================================================
# 8. STOP SPARK
# =========================================================
spark.stop()


In [12]:
# =========================================================
# 1. IMPORTS
# =========================================================
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

# =========================================================
# 2. SPARK SESSION
# =========================================================
spark = (
    SparkSession.builder
    .appName("Excel_Multiple_Sheets")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

# =========================================================
# 3. CREATE SAMPLE DATA
# =========================================================
data = [
    (1,'U1','2024-01-01',100),
    (2,'U1','2024-01-05',150),
    (3,'U1','2024-01-10',200),
    (4,'U1','2024-01-15',120),
    (5,'U1','2024-01-20',180),
    (6,'U1','2024-01-25',90),
    (7,'U2','2024-01-03',50),
    (8,'U2','2024-01-15',70),
    (9,'U3','2024-02-01',300),
    (10,'U3','2024-02-05',250),
    (11,'U3','2024-02-10',200),
    (12,'U3','2024-02-15',150),
    (13,'U3','2024-02-18',100),
    (14,'U3','2024-02-20',120)
]

schema = T.StructType([
    T.StructField("order_id", T.IntegerType(), False),
    T.StructField("user_id", T.StringType(), True),
    T.StructField("order_date", T.StringType(), True),
    T.StructField("amount", T.IntegerType(), True)
])

df = spark.createDataFrame(data, schema)

# =========================================================
# 4. TRANSFORMATIONS
# =========================================================
df = df.withColumn("order_date", F.to_date("order_date"))

# =========================================================
# 5. WRITE SINGLE EXCEL WITH MULTIPLE SHEETS
# =========================================================
output_dir = "C:/Git files/My git files/PySpark/files"
os.makedirs(output_dir, exist_ok=True)

output_file = f"{output_dir}/orders_by_user.xlsx"

# Convert Spark DF to Pandas
pdf = df.toPandas()

# Create Excel writer
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
    for user_id, user_df in pdf.groupby("user_id"):
        user_df.to_excel(
            writer,
            sheet_name=user_id,   # Sheet name = user_id
            index=False
        )

print(f"✅ Excel file created successfully at {output_file}")

# =========================================================
# 6. STOP SPARK
# =========================================================
spark.stop()


✅ Excel file created successfully at C:/Git files/My git files/PySpark/files/orders_by_user.xlsx


In [None]:
import os

print("HADOOP_HOME =", os.environ.get("HADOOP_HOME"))
print("PATH contains hadoop =", "C:\\hadoop\\bin" in os.environ.get("PATH", ""))
