<a href="https://colab.research.google.com/github/sangramkesharidash/Google-Colab-files/blob/main/EQUITYDATA_Screener_EDA_Using_PySpark_SQL_30_APR_2025_Final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cluster').getOrCreate()
print('Spark Version: {}'.format(spark.version))

Spark Version: 3.5.5


In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
import os
from google.colab import drive

# Check if Google Drive is mounted; mount if not
if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')
else:
    print("Google Drive already mounted.")

# Define file path and output folder
file_path = "/content/drive/My Drive/equitydata-screener-all-stocks/equitydata-screener-all-stocks-20250430.csv"
print(file_path)
!ls -ltr $file_path

Mounted at /content/drive
/content/drive/My Drive/equitydata-screener-all-stocks/equitydata-screener-all-stocks-20250430.csv
ls: cannot access '/content/drive/My': No such file or directory
ls: cannot access 'Drive/equitydata-screener-all-stocks/equitydata-screener-all-stocks-20250430.csv': No such file or directory


In [3]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

# Loading the data
dataset = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the data in the above file using the below command
#dataset.show(5)

# It's good practice to stop the SparkSession when you're done
# spark.stop()

In [4]:
from pyspark.sql import SparkSession

# Define file path (replace with your actual file path if needed)
#file_path = '/content/drive/My Drive/equitydata-screener-all-stocks/equitydata-screener-all-stocks-20250428.csv'

# Create a SparkSession
spark = SparkSession.builder.appName("IndustryPEAnalysis").getOrCreate()

# Loading the data
dataset = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first 5 rows of the data
print("First 5 rows of the dataset:")
#dataset.show(5)

# Print the schema of the DataFrame
print("Schema of the dataset:")
#dataset.printSchema()

# Register the DataFrame as a temporary SQL view
dataset.createOrReplaceTempView("equity_data")

# Execute SQL query to calculate the average Industry PE for each industry
result_df = spark.sql("""
SELECT
        Industry,
        CAST(SUM(`Market Capitalization`) AS BIGINT) as Total_Market_Capitalization_CR,
        COUNT(Name) as Total_Stocks,
        ROUND(AVG(`Industry PE`), 0) as Average_Industry_PE,
        CAST(ROUND(SUM(CASE WHEN `Price to Earning` < `Industry PE` THEN 1 ELSE 0 END) * 100 / COUNT(Name), 0) AS INT) as Percentage_Below_Industry_PE
    FROM
        equity_data
    GROUP BY
        Industry
    ORDER BY
        Total_Market_Capitalization_CR DESC
""")

# Show the result
print("\n Industry Analysis :")
result_df.show(truncate=False)

# It's good practice to stop the SparkSession when you're done
spark.stop()

First 5 rows of the dataset:
Schema of the dataset:

 Industry Analysis :
+----------------------------------------------+------------------------------+------------+-------------------+----------------------------+
|Industry                                      |Total_Market_Capitalization_CR|Total_Stocks|Average_Industry_PE|Percentage_Below_Industry_PE|
+----------------------------------------------+------------------------------+------------+-------------------+----------------------------+
|Finance & Investments                         |3810188                       |556         |20.0               |35                          |
|Banks - Private Sector                        |3792444                       |29          |12.0               |52                          |
|Computers - Software - Large                  |3051179                       |10          |30.0               |50                          |
|Refineries                                    |2344604                   

In [5]:
from pyspark.sql.functions import col
# Create a SparkSession
spark = SparkSession.builder.appName("PSLQCalculation").getOrCreate()
# Load the data
file_path = '/content/drive/My Drive/equitydata-screener-all-stocks/equitydata-screener-all-stocks-20250428.csv' # Replace with your actual file path
dataset = spark.read.csv(file_path, header=True, inferSchema=True)
# Fill null values with 0 for Market Capitalization and Sales latest quarter
dataset = dataset.fillna(0, subset=["Market Capitalization", "Sales latest quarter"])

# Register the DataFrame as a temporary SQL view
dataset.createOrReplaceTempView("equity_data")

# Calculate PSLQ
result_df = spark.sql("""
    SELECT
        Name,
        `High price` as `52w High Rs.`,
        `Low price` as `52w Low`,
        round((`High price` - `Current Price`) / `High price` * 100, 1) as `Down %`,
        `Current Price`,
        `DMA 200`,
        `DMA 50`,
        Industry,
        `Industry PE`,
        round(`Market Capitalization` / Sales, 1) as PS_Ratio,
        round(`Market Capitalization` / (`Sales latest quarter` * 4), 1) as `PS_LastQtr`,
        round(`Sales latest quarter` / (`Profit after tax latest quarter` * 4), 1) as LastQtr_PAT_Pct
    FROM
        equity_data
    WHERE
        Industry IN ('Banks - Private Sector')

""")

# Show the first 5 rows with the PSLQ
#dataset.select("Name", "Market Capitalization", "Sales latest quarter", "PSLQ").show(5, truncate=False)

# Stop the SparkSession
#spark.stop()



# Execute SQL query to calculate the average Industry PE for each industry
result_df = spark.sql("""
    SELECT
        Name,
        `High price` as `52w High Rs.`,
        `Low price` as `52w Low`,
        round((`High price` - `Current Price`) / `High price` * 100, 1) as `Down %`,
        `Current Price`,
        `DMA 200`,
        `DMA 50`,
        Industry,
        `Industry PE`,
        round(`Market Capitalization` / Sales, 1) as PS_Ratio,
        round(`Market Capitalization` / (`Sales latest quarter` * 4), 1) as `PS_LastQtr`,
        round(`Sales latest quarter` / (`Profit after tax latest quarter` * 4), 1) as LastQtr_PAT_Pct
    FROM
        equity_data
    WHERE
        Industry IN ('Banks - Private Sector')

""")

# Industry IN ('Banks - Private Sector') AND `Current Price` < `DMA 200`

# Show the result
print("\n Industry: Banks - Private Sector")
result_df.show(truncate=False)

# It's good practice to stop the SparkSession when you're done
#spark.stop()


 Industry: Banks - Private Sector
+----------------+------------+-------+------+-------------+-------+-------+----------------------+-----------+--------+----------+---------------+
|Name            |52w High Rs.|52w Low|Down %|Current Price|DMA 200|DMA 50 |Industry              |Industry PE|PS_Ratio|PS_LastQtr|LastQtr_PAT_Pct|
+----------------+------------+-------+------+-------------+-------+-------+----------------------+-----------+--------+----------+---------------+
|AU Small Finance|755.4       |478.35 |8.9   |688.4        |599.91 |577.02 |Banks - Private Sector|11.9       |3.2     |3.0       |2.1            |
|Axis Bank       |1339.65     |933.5  |11.0  |1192.8       |1097.36|1093.97|Banks - Private Sector|11.9       |2.9     |2.8       |1.1            |
|Bandhan Bank    |222.31      |128.15 |26.1  |164.26       |168.16 |152.91 |Banks - Private Sector|11.9       |1.2     |1.2       |3.2            |
|Capital Small   |406.7       |250.0  |25.3  |303.75       |301.85 |282.98 |B

In [6]:
# Execute SQL query to calculate the average Industry PE for each industry
result_df = spark.sql("""
    SELECT
        Name,
        `High price` as `52w High Rs.`,
        `Low price` as `52w Low`,
        round((`High price` - `Current Price`) / `High price` * 100, 1) as `Down %`,
        `Current Price`,
        `DMA 200`,
        `DMA 50`,
        Industry,
        `Industry PE`,
        round(`Market Capitalization` / Sales, 1) as PS_Ratio,
        round(`Market Capitalization` / (`Sales latest quarter` * 4), 1) as `PS_LastQtr`,
        round(`Sales latest quarter` / (`Profit after tax latest quarter` * 4), 1) as LastQtr_PAT_Pct
    FROM
        equity_data
    WHERE
        Industry IN ('Healthcare') AND `Current Price` < `DMA 200`

""")

# Industry IN ('Banks - Private Sector') AND `Current Price` < `DMA 200`

# Show the result
print("\n Industry: Healthcare")
result_df.show(truncate=False)

# It's good practice to stop the SparkSession when you're done
spark.stop()


 Industry: Healthcare
+----------------+------------+-------+------+-------------+-------+-------+----------+-----------+--------+----------+---------------+
|Name            |52w High Rs.|52w Low|Down %|Current Price|DMA 200|DMA 50 |Industry  |Industry PE|PS_Ratio|PS_LastQtr|LastQtr_PAT_Pct|
+----------------+------------+-------+------+-------------+-------+-------+----------+-----------+--------+----------+---------------+
|Aatmaj Health   |41.5        |14.5   |53.4  |19.35        |24.36  |18.28  |Healthcare|56.07      |2.8     |1.3       |6.9            |
|Adeshwar Meditex|29.98       |15.0   |46.6  |16.0         |24.84  |21.33  |Healthcare|56.07      |0.3     |0.1       |11.4           |
|Amkay Products  |120.96      |36.0   |60.7  |47.5         |63.22  |47.61  |Healthcare|56.07      |1.2     |0.6       |2.4            |
|Artemis Medicare|350.0       |143.05 |24.6  |264.05       |264.85 |271.99 |Healthcare|56.07      |4.1     |4.0       |2.7            |
|Bandaram Pharma |63.14  

In [7]:
from pyspark.sql import SparkSession

# Define file path (replace with your actual file path if needed)
file_path = '/content/drive/My Drive/equitydata-screener-all-stocks/equitydata-screener-all-stocks-20250428.csv'

# Create a SparkSession
spark = SparkSession.builder.appName("IndustryAnalysis").getOrCreate()

# Loading the data
dataset = spark.read.csv(file_path, header=True, inferSchema=True)

# Register the DataFrame as a temporary SQL view
dataset.createOrReplaceTempView("equity_data")

# Get all unique industries
industries_df = spark.sql("SELECT DISTINCT Industry FROM equity_data")
industries = [row.Industry for row in industries_df.collect()]

# Iterate over each industry and print the analysis
for industry in industries:
    query = f"""
        SELECT
            Name,
            `High price` as `52w High Rs.`,
            `Low price` as `52w Low`,
            round((`High price` - `Current Price`) / `High price` * 100, 1) as `Down %`,
            `Current Price`,
            `DMA 200`,
            `DMA 50`,
            Industry,
            `Industry PE`,
            round(`Market Capitalization` / Sales, 1) as PS_Ratio,
            round(`Market Capitalization` / (`Sales latest quarter` * 4), 1) as `PS_LastQtr`,
            round(`Sales latest quarter` / (`Profit after tax latest quarter` * 4), 1) as LastQtr_PAT_Pct
        FROM
            equity_data
        WHERE
            Industry = '{industry}' AND `Current Price` < `DMA 200`
    """
    result_df = spark.sql(query)
    print(f"\n Industry: {industry}")
    result_df.show(truncate=False)

# It's good practice to stop the SparkSession when you're done
spark.stop()


 Industry: Personal Care - Multinational
+----------------+------------+-------+------+-------------+--------+--------+-----------------------------+-----------+--------+----------+---------------+
|Name            |52w High Rs.|52w Low|Down %|Current Price|DMA 200 |DMA 50  |Industry                     |Industry PE|PS_Ratio|PS_LastQtr|LastQtr_PAT_Pct|
+----------------+------------+-------+------+-------------+--------+--------+-----------------------------+-----------+--------+----------+---------------+
|Colgate-Palmoliv|3893.0      |2311.5 |30.8  |2694.5       |2744.24 |2542.34 |Personal Care - Multinational|54.06      |12.1    |12.6      |1.1            |
|Gillette India  |10699.0     |6191.0 |24.1  |8123.0       |8359.3  |8196.21 |Personal Care - Multinational|54.06      |9.5     |9.7       |1.4            |
|Hind. Unilever  |3035.0      |2136.0 |23.6  |2318.6       |2419.44 |2306.67 |Personal Care - Multinational|54.06      |8.6     |8.7       |1.5            |
|P & G Hygiene  