In [1]:
pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .getOrCreate()
spark

In [3]:
# Step 1: Import SparkContext
from pyspark import SparkContext
# Step 2: Initialize SparkContext (use getOrCreate to avoid multiple context errors)
sc = SparkContext.getOrCreate()

In [4]:
from pyspark.sql import Row

In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
from pyspark.sql.types import ArrayType,DoubleType,BooleanType
from pyspark.sql.functions import col,array_contains

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Region-wise Employee Count with Partitioning") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

# Load the CSV file as RDD
rdd = sc.textFile("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv")

# Extract header and filter it out
header = rdd.first()
data_rdd = rdd.filter(lambda row: row != header)

# Repartition the RDD for parallel processing
partitioned_rdd = data_rdd.repartition(4)

# Process: Extract COMPANY LOCATION (assumed at index 5), count occurrences
location_counts = partitioned_rdd \
    .map(lambda row: row.split(",", -1)) \
    .filter(lambda fields: len(fields) > 5) \
    .map(lambda fields: (fields[5].strip(), 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda pair: pair[1], ascending=False)

# Convert to DataFrame
row_rdd = location_counts.map(lambda pair: Row(pair[0], str(pair[1])))

schema = StructType([
    StructField("COMPANY_LOCATION", StringType(), True),
    StructField("EMPLOYEE_COUNT", StringType(), True)
])

df = spark.createDataFrame(row_rdd, schema)



df.show()

spark.stop()

+----------------+--------------+
|COMPANY_LOCATION|EMPLOYEE_COUNT|
+----------------+--------------+
|        Hyderbad|           840|
|       Bangalore|           278|
|           Delhi|           272|
|     Navi Mumbai|           190|
|          Mumbai|           188|
|                |            99|
|         Chennai|            83|
|            Pune|            69|
|           Noida|            34|
|          Jaipur|            25|
|    Gandhi Nagar|            21|
+----------------+--------------+



In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, when
from pyspark.sql.types import DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Income Tax Savings Calculation") \
    .master("local[*]") \
    .getOrCreate()

# Load the CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv")

# Define tax rate
tax_rate = 0.10

# Step 1: Remove rows with null or empty EMP_NAME
cleaned_df = df.filter(
    col("EMP_NAME").isNotNull() & (trim(col("EMP_NAME")) != "")
)

# Step 2: Convert BALANCE to INCOME and calculate TAX_SAVINGS
result_df = cleaned_df \
    .withColumn("INCOME", col("BALANCE").cast(DoubleType())) \
    .withColumn("TAX_SAVINGS",
        when(col("INCOME") > 200000, (col("INCOME") - 200000) * tax_rate)
        .otherwise(0.0)
    ) \
    .select("EMP_NAME", "INCOME", "TAX_SAVINGS") \
    .filter(col("INCOME").isNotNull())

# Show the result
result_df.show()

# Stop the Spark session
spark.stop()

+---------------+---------+------------------+
|       EMP_NAME|   INCOME|       TAX_SAVINGS|
+---------------+---------+------------------+
|        Hussain|  21430.0|               0.0|
|         Nagesh|   2900.0|               0.0|
|       Preetham|   2222.0|               0.0|
| Rajashekarappa| 150681.0|               0.0|
|        Monappa|1878787.0|          167878.7|
|          Uthra| 231886.0|3188.6000000000004|
|Narasimhamurthy| 447787.0|           24778.7|
|         Thorat| 220202.0|            2020.2|
|        Puvvadi|  12180.0|               0.0|
|         Gawand|  59300.0|               0.0|
|      Pulathota|  27000.0|               0.0|
|      Manjunath|  39052.0|               0.0|
|        Kandpal|  60000.0|               0.0|
|     Pazhanivel|   5000.0|               0.0|
|       Chappidi|   1620.0|               0.0|
|           Jose|  22900.0|               0.0|
|         Dsouza|   1300.0|               0.0|
|      Gangasani|   5002.0|               0.0|
|   Ghattaman

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Experience Based Offers") \
    .master("local[*]") \
    .getOrCreate()

# Load the CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv")

# Ensure EXPERIENCE is numeric
df = df.withColumn("EXPERIENCE", col("EXPERIENCE").cast("double"))

# Categorize experience and assign offers
result_df = df.withColumn("Experience_Category",
    when(col("EXPERIENCE") <= 2, "0-2 years")
    .when(col("EXPERIENCE") <= 5, "2-5 years")
    .when(col("EXPERIENCE") <= 10, "5-10 years")
    .otherwise("10+ years")
).withColumn("Offer",
    when(col("Experience_Category") == "0-2 years", "Welcome Kit + 5% Cashback on first purchase")
    .when(col("Experience_Category") == "2-5 years", "Silver Membership + 10% Cashback")
    .when(col("Experience_Category") == "5-10 years", "Gold Membership + 15% Cashback")
    .otherwise("Platinum Membership + 20% Cashback + VIP Support")
)

# Select relevant columns
final_df = result_df.select("EMP_NAME", "EXPERIENCE", "Experience_Category", "Offer")

# Show the result
final_df.show(truncate=False)

# Stop the Spark session
spark.stop()

+---------------+----------+-------------------+-------------------------------------------+
|EMP_NAME       |EXPERIENCE|Experience_Category|Offer                                      |
+---------------+----------+-------------------+-------------------------------------------+
|Hussain        |2.0       |0-2 years          |Welcome Kit + 5% Cashback on first purchase|
|Nagesh         |3.5       |2-5 years          |Silver Membership + 10% Cashback           |
|Preetham       |4.5       |2-5 years          |Silver Membership + 10% Cashback           |
|Rajashekarappa |6.5       |5-10 years         |Gold Membership + 15% Cashback             |
|Monappa        |7.5       |5-10 years         |Gold Membership + 15% Cashback             |
|Uthra          |8.0       |5-10 years         |Gold Membership + 15% Cashback             |
|Narasimhamurthy|4.5       |2-5 years          |Silver Membership + 10% Cashback           |
|Thorat         |3.5       |2-5 years          |Silver Membership + 10

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Top 10 Bank Balances") \
    .master("local[*]") \
    .getOrCreate()

# Load the CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv")

# Convert BALANCE to numeric and drop rows with null balances
df_with_balance = df.withColumn("BALANCE", col("BALANCE").cast("double")) \
                    .na.drop(subset=["BALANCE"])

# Get top 10 accounts by balance
top_balances = df_with_balance.orderBy(col("BALANCE").desc()).limit(10)

# Show the result
top_balances.show()

# Stop the Spark session
spark.stop()

+---------------+-------+--------------+------+-------+----------------+-----------------+--------------+----------+-----------+----------+---------+----------+----------+-------+----+------------+--------------+-------------------+-------------+-------------+-------------+--------------+---------+-------------------+-------------------+----------------+-----------------+------------------+------------------+--------------------+--------------------+----+----+
|       EMP_NAME|EMP_AGE|MARITAL STATUS|EMP ID|COMPANY|COMPANY LOCATION|   SPECIALIZATION|DESIGNATION_ID|  PHONE NO|BANK BRANCH|EXPERIENCE|EDUCATION|  VOTER_ID|  PAN CARD|HOUSING|Loan|LOAN TYPE_ID|REWARD POINTS |SAVINGS ACCOUNT NO.|    CD ACC NO|    FD ACC NO|    OD ACC NO|CURRENT ACC NO|  BALANCE|     CREDIT CARD NO|      DEBIT CARD NO|INTERNET BANKING|AVG BALANCE YEAR1|AVG BALANCE YEAR 2|AVG BALANCE YEAR 3|ADDRESS OF EMPLOYEES|ACCOUNT OPENING DATE|_c32|_c33|
+---------------+-------+--------------+------+-------+---------------

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Partitioned Grouping by Age and Education") \
    .master("local[*]") \
    .getOrCreate()

# Load dataset
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv") \
    .cache()

# Filter out nulls for EMP_AGE and EDUCATION
filtered_df = df.filter(col("EMP_AGE").isNotNull() & col("EDUCATION").isNotNull())

# Group by EMP_AGE and EDUCATION and count
result_df = filtered_df.groupBy("EMP_AGE", "EDUCATION") \
    .agg(count("*").alias("Person_Count")) \
    .orderBy("EMP_AGE", "EDUCATION")

df.show()

# Stop Spark session
spark.stop()

+---------------+-------+--------------+------+-------+----------------+--------------------+--------------+----------+-----------+----------+---------+----------+----------+-------+----+------------+--------------+-------------------+-------------+---------+-------------+--------------+-------+-------------------+-------------------+----------------+-----------------+------------------+------------------+--------------------+--------------------+----+----+
|       EMP_NAME|EMP_AGE|MARITAL STATUS|EMP ID|COMPANY|COMPANY LOCATION|      SPECIALIZATION|DESIGNATION_ID|  PHONE NO|BANK BRANCH|EXPERIENCE|EDUCATION|  VOTER_ID|  PAN CARD|HOUSING|Loan|LOAN TYPE_ID|REWARD POINTS |SAVINGS ACCOUNT NO.|    CD ACC NO|FD ACC NO|    OD ACC NO|CURRENT ACC NO|BALANCE|     CREDIT CARD NO|      DEBIT CARD NO|INTERNET BANKING|AVG BALANCE YEAR1|AVG BALANCE YEAR 2|AVG BALANCE YEAR 3|ADDRESS OF EMPLOYEES|ACCOUNT OPENING DATE|_c32|_c33|
+---------------+-------+--------------+------+-------+----------------+----

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, length, count, desc

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Company Account Holder Count with Partitioning") \
    .master("local[*]") \
    .getOrCreate()

# Load CSV into DataFrame
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv") \
    .cache()

# Clean and filter: remove rows with null or empty COMPANY
cleaned_df = df.filter(col("COMPANY").isNotNull() & (length(trim(col("COMPANY"))) > 0))

# Repartition by COMPANY for optimized processing
partitioned_df = cleaned_df.repartition("COMPANY")

# Group by COMPANY and count
company_counts = partitioned_df.groupBy("COMPANY") \
    .agg(count("*").alias("Account_Holder_Count")) \
    .orderBy(desc("Account_Holder_Count"))

# Show result
company_counts.show(truncate=False)

# Stop Spark session
spark.stop()

+-------------+--------------------+
|COMPANY      |Account_Holder_Count|
+-------------+--------------------+
|IGATE        |749                 |
|IBM          |601                 |
|Wipro        |202                 |
|Cognizant    |100                 |
|Accenture    |96                  |
|Tech Mahindra|64                  |
|Infosys      |55                  |
|Global Edge  |38                  |
|TCS          |32                  |
|Seimens      |31                  |
|Qualcomm     |30                  |
|Cap Gemini   |2                   |
+-------------+--------------------+



In [12]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Loan Analysis using RDD") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

# Load the CSV file as text
data = sc.textFile("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv")

# Extract header and remove it from the data
header = data.first()
rows = data.filter(lambda line: line != header)

# Split each row by comma and extract relevant columns
# MARITAL STATUS = index 2, HOUSING = index 14, LOAN = index 15
analysis_rdd = rows.map(lambda line: line.split(",", -1)) \
    .filter(lambda fields: len(fields) > 15) \
    .map(lambda fields: ((fields[2].strip(), fields[14].strip(), fields[15].strip()), 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: f"{x[0][0]},{x[0][1]},{x[0][2]},{x[1]}")

# Collect and print the results
for line in analysis_rdd.collect():
    print(line)

# Stop Spark session
spark.stop()

married,yes,no,914
single,yes,no,425
divorced,yes,no,204
married,no,yes,16
single,no,yes,2
divorced,no,yes,3
married,yes,yes,153
single,no,no,39
single,yes,yes,54
married,no,no,125
divorced,no,no,29
divorced,yes,yes,36
,,,99


In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Retirement Policy Categorization with Names") \
    .master("local[*]") \
    .getOrCreate()

# Load the CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv")

# Clean and cast EMP_AGE to Integer
cleaned_df = df.filter(col("EMP_AGE").isNotNull()) \
    .withColumn("EMP_AGE", col("EMP_AGE").cast(IntegerType()))

# Categorize based on age
categorized_df = cleaned_df.withColumn("Retirement_Category",
    when(col("EMP_AGE") < 55, "Below Retirement Age")
    .when(col("EMP_AGE").between(55, 60), "Approaching Retirement")
    .otherwise("Eligible for Retirement")
)

# Select employee name and retirement category
result_df = categorized_df.select("EMP_NAME", "EMP_AGE", "Retirement_Category")

# Show the result
result_df.show(truncate=False)

# Stop Spark session
spark.stop()

+---------------+-------+--------------------+
|EMP_NAME       |EMP_AGE|Retirement_Category |
+---------------+-------+--------------------+
|Hussain        |24     |Below Retirement Age|
|Nagesh         |26     |Below Retirement Age|
|Preetham       |27     |Below Retirement Age|
|Rajashekarappa |28     |Below Retirement Age|
|Monappa        |30     |Below Retirement Age|
|Uthra          |31     |Below Retirement Age|
|Narasimhamurthy|27     |Below Retirement Age|
|Thorat         |24     |Below Retirement Age|
|Puvvadi        |25     |Below Retirement Age|
|Gawand         |23     |Below Retirement Age|
|Pulathota      |28     |Below Retirement Age|
|Manjunath      |29     |Below Retirement Age|
|Kandpal        |30     |Below Retirement Age|
|Pazhanivel     |28     |Below Retirement Age|
|Chappidi       |27     |Below Retirement Age|
|Jose           |29     |Below Retirement Age|
|Dsouza         |35     |Below Retirement Age|
|Gangasani      |36     |Below Retirement Age|
|Ghattamaneni

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Reward Tier Offers") \
    .master("local[*]") \
    .getOrCreate()

# Load the CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv")

# Clean and cast REWARD POINTS column
cleaned_df = df.withColumn("REWARD_POINTS", col("REWARD POINTS ").cast(IntegerType()))

# Categorize into reward tiers
with_tiers = cleaned_df.withColumn("Reward_Tier",
    when(col("REWARD_POINTS").between(0, 999), "Bronze")
    .when(col("REWARD_POINTS").between(1000, 4999), "Silver")
    .when(col("REWARD_POINTS").between(5000, 9999), "Gold")
    .when(col("REWARD_POINTS") >= 10000, "Platinum")
    .otherwise("Unknown")
)

# Add personalized offers
with_offers = with_tiers.withColumn("Offer",
    when(col("Reward_Tier") == "Bronze", "5% off on next purchase")
    .when(col("Reward_Tier") == "Silver", "10% off + free shipping")
    .when(col("Reward_Tier") == "Gold", "15% off + priority support")
    .when(col("Reward_Tier") == "Platinum", "20% off + VIP lounge access")
    .otherwise("No offer available")
)

# Select relevant columns
result_df = with_offers.select("EMP_NAME", "REWARD_POINTS", "Reward_Tier", "Offer")

# Show the result
result_df.show(truncate=False)

# Stop Spark session
spark.stop()

+---------------+-------------+-----------+---------------------------+
|EMP_NAME       |REWARD_POINTS|Reward_Tier|Offer                      |
+---------------+-------------+-----------+---------------------------+
|Hussain        |85           |Bronze     |5% off on next purchase    |
|Nagesh         |45           |Bronze     |5% off on next purchase    |
|Preetham       |635          |Bronze     |5% off on next purchase    |
|Rajashekarappa |1235         |Silver     |10% off + free shipping    |
|Monappa        |5214         |Gold       |15% off + priority support |
|Uthra          |6357         |Gold       |15% off + priority support |
|Narasimhamurthy|8564         |Gold       |15% off + priority support |
|Thorat         |123          |Bronze     |5% off on next purchase    |
|Puvvadi        |5            |Bronze     |5% off on next purchase    |
|Gawand         |4562         |Silver     |10% off + free shipping    |
|Pulathota      |2598         |Silver     |10% off + free shippi

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, year, col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Accounts Opened Per Year with SQL") \
    .master("local[*]") \
    .getOrCreate()

# Load the CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("C:\\Users\\adashrat\\Downloads\\BANKDATA.csv")

# Filter rows with valid date format (dd-MM-yyyy)
valid_date_df = df.filter(col("ACCOUNT OPENING DATE").rlike("^\\d{2}-\\d{2}-\\d{4}$"))

# Parse the date
df_with_date = valid_date_df.withColumn(
    "ACCOUNT_OPENING_DATE_PARSED",
    to_date(col("ACCOUNT OPENING DATE"), "dd-MM-yyyy")
)

# Create a temporary view for SQL
df_with_date.createOrReplaceTempView("bankdata")

# Run SQL query to count accounts opened per year
result = spark.sql("""
    SELECT year(ACCOUNT_OPENING_DATE_PARSED) AS YEAR, COUNT(*) AS Account_Count
    FROM bankdata
    WHERE ACCOUNT_OPENING_DATE_PARSED IS NOT NULL
    GROUP BY year(ACCOUNT_OPENING_DATE_PARSED)
    ORDER BY YEAR
""")

# Show the result
result.show()

# Stop Spark session
spark.stop()

+----+-------------+
|YEAR|Account_Count|
+----+-------------+
|1900|            1|
|2006|          116|
|2007|          102|
|2008|          181|
|2009|          410|
|2010|          381|
|2011|          351|
|2012|          321|
|2013|          134|
|2014|            1|
+----+-------------+

