<a href="https://colab.research.google.com/github/vishwaraj14/Pyhton/blob/main/stockkkk.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install spark



In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HCLStockAnalysis") \
    .getOrCreate()


In [None]:
from pyspark.sql.types import *


In [None]:
from pyspark.sql.functions import *

In [None]:
hcl_schema = StructType().add("Date",DateType()).add("Adjusted Close",DoubleType()).add("Close",DoubleType())\
.add("High",DoubleType()).add("Low",DoubleType()).add("Open",DoubleType()).add("Volume",LongType())


In [None]:
df = spark.read.format("csv") \
    .option("header", "True") \
    .schema(hcl_schema) \
    .load("/content/sample_data/HCLTECH - HCLTECH.csv")


In [None]:
df.show()

+----------+--------------+-----------+-----------+-----------+-----------+--------+
|      Date|Adjusted Close|      Close|       High|        Low|       Open|  Volume|
+----------+--------------+-----------+-----------+-----------+-----------+--------+
|      NULL|          NULL|       NULL|       NULL|       NULL|       NULL|    NULL|
|      NULL|          NULL|       NULL|       NULL|       NULL|       NULL|    NULL|
|2002-08-12|     13.796875|23.76250076|23.98749924|     22.625|22.71249962|11239400|
|2002-08-13|   13.54285812|23.32500076|      24.25|23.14999962|      24.25| 3756368|
|2002-08-14|   13.35778618|23.00625038|23.23749924|22.76874924|23.23749924| 3964016|
|2002-08-15|   13.35778618|23.00625038|23.00625038|23.00625038|23.00625038|       0|
|2002-08-16|   13.70615864|23.60625076|      23.75|      23.25|     23.625| 4848904|
|2002-08-19|   14.15613556|24.38125038|24.54999924|23.91250038|       24.0| 8905448|
|2002-08-20|   15.18309307|26.14999962|26.48125076|24.50625038|  

In [None]:
df.count()

5531

In [None]:
newdf = df.subtract(df.limit(2))

In [None]:
newdf.show()

+----------+--------------+-----------+-----------+-----------+-----------+--------+
|      Date|Adjusted Close|      Close|       High|        Low|       Open|  Volume|
+----------+--------------+-----------+-----------+-----------+-----------+--------+
|2003-05-30|    9.02590847|15.43124962|15.76875019|15.36250019|15.63749981| 6444512|
|2004-02-13|   22.53220749|37.67499924|      37.75|36.45000076|      36.75| 6696864|
|2006-05-04|    47.4326973|      73.75|       74.0|     72.125|      73.75| 2499264|
|2006-09-05|   46.97926712|73.04499817|     73.625|72.86250305|     73.625| 1553288|
|2007-02-02|   53.61912537|81.78749847|       82.5|80.56999969|82.26249695| 1113184|
|2007-02-15|   55.31221771|84.37000275|     85.625|81.13249969|       83.0| 5097728|
|2008-02-21|   48.15816116|71.44999695|72.44999695|      68.25|       70.0| 3734572|
|2008-05-02|   50.42791748|74.26249695|74.86250305|72.11250305|      72.25| 3275484|
|2008-10-15|   26.92432213|39.65000153|40.95000076|       38.5|  

In [None]:
newdf.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Adjusted Close: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Volume: long (nullable = true)



In [None]:
newdf.createOrReplaceTempView("hcl_stock")


In [None]:
spark.catalog.listTables()


[Table(name='hcl_stock', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
spark.sql("""
    SELECT
        SUM(CASE WHEN Date IS NULL THEN 1 ELSE 0 END) AS Date_NullCount,
        SUM(CASE WHEN Open IS NULL THEN 1 ELSE 0 END) AS Open_NullCount,
        SUM(CASE WHEN High IS NULL THEN 1 ELSE 0 END) AS High_NullCount,
        SUM(CASE WHEN Low IS NULL THEN 1 ELSE 0 END) AS Low_NullCount,
        SUM(CASE WHEN Close IS NULL THEN 1 ELSE 0 END) AS Close_NullCount,
        SUM(CASE WHEN Volume IS NULL THEN 1 ELSE 0 END) AS Volume_NullCount
    FROM hcl_stock
""").show()


+--------------+--------------+--------------+-------------+---------------+----------------+
|Date_NullCount|Open_NullCount|High_NullCount|Low_NullCount|Close_NullCount|Volume_NullCount|
+--------------+--------------+--------------+-------------+---------------+----------------+
|             0|             0|             0|            0|              0|               0|
+--------------+--------------+--------------+-------------+---------------+----------------+



In [None]:
spark.sql("""
    SELECT
        MIN(Close) AS Min_Close,
        MAX(Close) AS Max_Close,
        AVG(Close) AS Avg_Close,
        STDDEV(Close) AS StdDev_Close
    FROM hcl_stock
""").show()


+-----------+-----------+-----------------+-----------------+
|  Min_Close|  Max_Close|        Avg_Close|     StdDev_Close|
+-----------+-----------+-----------------+-----------------+
|14.86250019|1872.849976|400.8948375123735|427.9098794977626|
+-----------+-----------+-----------------+-----------------+



In [None]:
duplicate_count = newdf.count() - newdf.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count}")


Number of duplicate rows: 0


In [None]:
date_check_df = spark.sql("""
    SELECT Date
    FROM hcl_stock
    ORDER BY Date
""")
date_check_df.show(10)  # Display first 10 dates to inspect


+----------+
|      Date|
+----------+
|2002-08-12|
|2002-08-13|
|2002-08-14|
|2002-08-15|
|2002-08-16|
|2002-08-19|
|2002-08-20|
|2002-08-21|
|2002-08-22|
|2002-08-23|
+----------+
only showing top 10 rows



In [None]:

date_check_df = spark.sql("""
    SELECT Date
    FROM hcl_stock
    ORDER BY Date
""")
date_check_df.show(10)


+----------+
|      Date|
+----------+
|2002-08-12|
|2002-08-13|
|2002-08-14|
|2002-08-15|
|2002-08-16|
|2002-08-19|
|2002-08-20|
|2002-08-21|
|2002-08-22|
|2002-08-23|
+----------+
only showing top 10 rows



In [None]:
newdf.dropna

In [None]:
# Phase 2 - Data Cleaning and Preparation
# adding new columns - moving average and daily return



In [None]:
from pyspark.sql import Window

In [None]:
# Create a window to calculate the previous day's Close price
window_spec = Window.orderBy("Date")
type(window_spec)

In [None]:
 #Calculate daily returns as a percentage change from the previous day
df_cleaned = newdf.withColumn("Prev_Close", lag("Close").over(window_spec))

In [None]:
df_cleaned = df_cleaned.withColumn("Daily_Return", ((col("Close") - col("Prev_Close")) / col("Prev_Close")) * 100)

In [None]:
df_cleaned.show(5)

+----------+--------------+-----------+-----------+-----------+-----------+--------+-----------+-------------------+
|      Date|Adjusted Close|      Close|       High|        Low|       Open|  Volume| Prev_Close|       Daily_Return|
+----------+--------------+-----------+-----------+-----------+-----------+--------+-----------+-------------------+
|2002-08-12|     13.796875|23.76250076|23.98749924|     22.625|22.71249962|11239400|       NULL|               NULL|
|2002-08-13|   13.54285812|23.32500076|      24.25|23.14999962|      24.25| 3756368|23.76250076|-1.8411361851966963|
|2002-08-14|   13.35778618|23.00625038|23.23749924|22.76874924|23.23749924| 3964016|23.32500076|-1.3665610701570705|
|2002-08-15|   13.35778618|23.00625038|23.00625038|23.00625038|23.00625038|       0|23.00625038|                0.0|
|2002-08-16|   13.70615864|23.60625076|      23.75|      23.25|     23.625| 4848904|23.00625038|  2.607988568713477|
+----------+--------------+-----------+-----------+-----------+-

In [None]:
df_cleaned.orderBy("Date",ascending=False).show(5)

+----------+--------------+-----------+-----------+-----------+-----------+-------+-----------+--------------------+
|      Date|Adjusted Close|      Close|       High|        Low|       Open| Volume| Prev_Close|        Daily_Return|
+----------+--------------+-----------+-----------+-----------+-----------+-------+-----------+--------------------+
|2024-11-13|       1864.75|    1864.75|1879.599976|1859.099976|     1866.0|1253920|1872.849976|-0.43249465273773585|
|2024-11-12|   1872.849976|1872.849976|1892.949951|1864.449951|     1891.0|2711052|1867.300049|  0.2972166686854741|
|2024-11-11|   1867.300049|1867.300049|     1874.0|     1834.0|1846.699951|1679869|     1837.5|  1.6217713741496567|
|2024-11-08|        1837.5|     1837.5|1854.150024|1828.050049|1838.949951|1521324|1831.949951| 0.30295854954827556|
|2024-11-07|   1831.949951|1831.949951|1862.550049|1814.199951|1850.050049|2173869|1838.400024|-0.35085253023255913|
+----------+--------------+-----------+-----------+-----------+-

In [None]:
# Calculate 50-day moving averages for 'Close' prices
window_50 = Window.orderBy(col("Date").desc()).rowsBetween(-49, 0)
df_cleaned = df_cleaned.withColumn("SMA_50", avg("Close").over(window_50))

In [None]:
df_cleaned.show(5)

+----------+--------------+-----------+-----------+-----------+-----------+--------+-----------------+------------------+
|      Date|Adjusted Close|      Close|       High|        Low|       Open|  Volume|     Daily_Return|            SMA_50|
+----------+--------------+-----------+-----------+-----------+-----------+--------+-----------------+------------------+
|2002-08-12|     13.796875|23.76250076|23.98749924|     22.625|22.71249962|11239400|72.23103608607023|       23.76250076|
|2002-08-13|   13.54285812|23.32500076|      24.25|23.14999962|      24.25| 3756368|72.23100584324811|       23.54375076|
|2002-08-14|   13.35778618|23.00625038|23.23749924|22.76874924|23.23749924| 3964016| 72.2310124595811|23.364583966666668|
|2002-08-15|   13.35778618|23.00625038|23.00625038|23.00625038|23.00625038|       0| 72.2310124595811|       23.27500057|
|2002-08-16|   13.70615864|23.60625076|      23.75|      23.25|     23.625| 4848904|72.23097572435512|23.341250608000003|
+----------+------------

In [None]:
# Calculate 200-day moving averages for 'Close' prices
window_200 = Window.orderBy(col("Date").desc()).rowsBetween(-199, 0)
df_cleaned = df_cleaned.withColumn("SMA_200", avg("Close").over(window_200))

In [None]:
df_cleaned.show(5)

+----------+--------------+-----------+-----------+-----------+-----------+-------+------------+------------------+------------------+
|      Date|Adjusted Close|      Close|       High|        Low|       Open| Volume|Daily_Return|            SMA_50|           SMA_200|
+----------+--------------+-----------+-----------+-----------+-----------+-------+------------+------------------+------------------+
|2024-11-13|       1864.75|    1864.75|1879.599976|1859.099976|     1866.0|1253920|         0.0|1810.2370020000003|           1864.75|
|2024-11-12|   1872.849976|1872.849976|1892.949951|1864.449951|     1891.0|2711052|         0.0|1808.7510010199999|       1868.799988|
|2024-11-11|   1867.300049|1867.300049|     1874.0|     1834.0|1846.699951|1679869|         0.0|1807.4270019800001|1868.3000083333334|
|2024-11-08|        1837.5|     1837.5|1854.150024|1828.050049|1838.949951|1521324|         0.0|       1805.146001|     1860.60000625|
|2024-11-07|   1831.949951|1831.949951|1862.550049|1814

In [None]:
df_cleaned.describe(["Open", "High", "Low", "Close", "Volume", "Daily_Return"]).show()

+-------+------------------+-----------------+------------------+-----------------+-----------------+------------------+
|summary|              Open|             High|               Low|            Close|           Volume|      Daily_Return|
+-------+------------------+-----------------+------------------+-----------------+-----------------+------------------+
|  count|              5529|             5529|              5529|             5529|             5529|              5529|
|   mean| 401.0559657028477|405.9276065125634|395.93029846622863|400.8948375123735|4861474.167842286|33.855501195496224|
| stddev|428.04168175116223| 432.448034674308|423.49518200263105|427.9098794977626|4981283.850281568| 19.51758135929127|
|    min|       14.94375038|            15.25|       14.69999981|      14.86250019|                0|               0.0|
|    max|            1891.0|      1892.949951|       1864.449951|      1872.849976|        125442440| 72.23107836098873|
+-------+------------------+----

In [None]:
# Register DataFrame as a temporary SQL view
df_cleaned.createOrReplaceTempView("HCL_stock_cleaned")

In [None]:
# SQL query to calculate summary statistics for the 'Close' and 'Volume' columns
spark.sql("""
    SELECT
        MIN(Close) AS Min_Close,
        MAX(Close) AS Max_Close,
        AVG(Close) AS Avg_Close,
        STDDEV(Close) AS StdDev_Close,
        MIN(Volume) AS Min_Volume,
        MAX(Volume) AS Max_Volume,
        AVG(Volume) AS Avg_Volume,
        STDDEV(Volume) AS StdDev_Volume
    FROM HCL_stock_cleaned
""").show()

+-----------+-----------+-----------------+-----------------+----------+----------+-----------------+-----------------+
|  Min_Close|  Max_Close|        Avg_Close|     StdDev_Close|Min_Volume|Max_Volume|       Avg_Volume|    StdDev_Volume|
+-----------+-----------+-----------------+-----------------+----------+----------+-----------------+-----------------+
|14.86250019|1872.849976|400.8948375123735|427.9098794977626|         0| 125442440|4861474.167842286|4981283.850281568|
+-----------+-----------+-----------------+-----------------+----------+----------+-----------------+-----------------+



In [None]:
# Summary of daily returns
spark.sql("""
    SELECT
        AVG(Daily_Return) AS Avg_Daily_Return,
        STDDEV(Daily_Return) AS Daily_Return_StdDev,
        MIN(Daily_Return) AS Min_Daily_Return,
        MAX(Daily_Return) AS Max_Daily_Return
    FROM apollo_stock_cleaned
""").show()

In [None]:
# a) Correlation between Close Price and Volume

close_volume_corr = df_cleaned.stat.corr("Close", "Volume")
print(f"Correlation between Close and Volume: {close_volume_corr}")



Correlation between Close and Volume: -0.1511055358098198


In [None]:
# SQL query for correlation matrix between key stock metrics
spark.sql("""
    SELECT
        corr(Open, Close) AS Open_Close_Correlation,
        corr(High, Close) AS High_Close_Correlation,
        corr(Low, Close) AS Low_Close_Correlation,
        corr(Volume, Close) AS Volume_Close_Correlation
    FROM hcl_stock_cleaned
""").show()


+----------------------+----------------------+---------------------+------------------------+
|Open_Close_Correlation|High_Close_Correlation|Low_Close_Correlation|Volume_Close_Correlation|
+----------------------+----------------------+---------------------+------------------------+
|    0.9998063302663437|    0.9999216628438844|   0.9999146752989329|    -0.15110553580981984|
+----------------------+----------------------+---------------------+------------------------+



In [None]:
# Using SQL, you can calculate averages over different timeframes and compare them.

spark.sql("""
    SELECT
        Date,
        Close,
        SMA_50,
        SMA_200,
        CASE WHEN SMA_50 > SMA_200 THEN 'Uptrend' ELSE 'Downtrend' END AS Trend
    FROM HCL_stock_cleaned
    ORDER BY Date
""").show(5)


+----------+-----------+------------------+------------------+-------+
|      Date|      Close|            SMA_50|           SMA_200|  Trend|
+----------+-----------+------------------+------------------+-------+
|2002-08-12|23.76250076|       23.76250076|21.945000085499995|Uptrend|
|2002-08-13|23.32500076|       23.54375076|21.913218831699997|Uptrend|
|2002-08-14|23.00625038|23.364583966666668|21.882375077899997|Uptrend|
|2002-08-15|23.00625038|       23.27500057|21.847812575999995|Uptrend|
|2002-08-16|23.60625076|23.341250608000003|21.809687575999998|Uptrend|
+----------+-----------+------------------+------------------+-------+
only showing top 5 rows



In [None]:
# SQL query to find days with the highest volume and corresponding price changes
spark.sql("""
    SELECT
        Date,
        Volume,
        Close,
        Daily_Return
    FROM HCL_stock_cleaned
    ORDER BY Volume DESC
    LIMIT 10
""").show()
'''
This query highlights the days with the highest trading volumes and their corresponding daily returns.
Spikes in volume often accompany big price moves, either up or down.

'''

+----------+---------+-----------+-----------------+
|      Date|   Volume|      Close|     Daily_Return|
+----------+---------+-----------+-----------------+
|2002-10-22|125442440|21.88750076|70.96613747120631|
|2002-10-23| 90007952|20.88750076|70.96610011442965|
|2007-11-08| 80309532|75.16249847|50.62695877412911|
|2003-09-15| 62881504|   20.34375|70.96612279268943|
|2002-10-24| 59089792|20.26250076|70.96613519205633|
|2002-10-25| 49617192|20.43124962|70.96610373267671|
|2003-04-28| 49334920|     17.375|70.96610404767235|
|2012-11-30| 41511124|    164.125|32.68796762428142|
|2003-09-16| 41195184|20.41875076|70.96607848086023|
|2002-11-11| 40565424|20.66250038|70.96611200186098|
+----------+---------+-----------+-----------------+



'\nThis query highlights the days with the highest trading volumes and their corresponding daily returns. \nSpikes in volume often accompany big price moves, either up or down.\n\n'

In [None]:
# Calculate average close price and volume by month using SQL
spark.sql("""
    SELECT
        MONTH(Date) AS Month,
        AVG(Close) AS Avg_Monthly_Close,
        AVG(Volume) AS Avg_Monthly_Volume
    FROM HCl_stock_cleaned
    GROUP BY MONTH(Date)
    ORDER BY Month
""").show()


+-----+------------------+------------------+
|Month| Avg_Monthly_Close|Avg_Monthly_Volume|
+-----+------------------+------------------+
|    1| 398.8136037630601| 4939977.605603448|
|    2|   413.13357465947| 4074476.714285714|
|    3| 395.1653724621459| 4719559.827433628|
|    4|390.08532759115815| 5631843.886524823|
|    5|381.05871072646653|4621296.5845824415|
|    6| 385.8077827727351| 4091963.032051282|
|    7| 397.6156457494814| 4212827.419087137|
|    8|406.68609987876533| 4314228.958158996|
|    9|426.91342427162135| 5448834.947368421|
|   10| 433.6690400457942| 6634669.527896996|
|   11|399.19234470226115|  4913967.33481153|
|   12| 381.8948342055435|4778223.5287846485|
+-----+------------------+------------------+

