In [1]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=d047c3a3a8e2dfd8f7b5efa7e8c741b91ed40347d5a447b5ac39ac0dd4418ed7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import findspark
findspark.init()


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count


In [4]:
spark = (SparkSession
.builder
.appName("DataFrame")
.getOrCreate())

In [5]:
df = spark.read.csv('/content/walmart_stock.csv',header=True)

In [6]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [7]:
df.schema

StructType([StructField('Date', StringType(), True), StructField('Open', StringType(), True), StructField('High', StringType(), True), StructField('Low', StringType(), True), StructField('Close', StringType(), True), StructField('Volume', StringType(), True), StructField('Adj Close', StringType(), True)])

In [8]:
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [9]:
df.describe().show()

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      NULL| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      NULL|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|2012-01-03|56.389998999999996|        57.060001|        56.299999|        56.419998|         10010500|        50.363689|
|    max|2016-12-30|         90.800003|        90.970001|            89.25|        90.4700

In [10]:
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import col

In [11]:
describe_df = df.describe()

In [12]:
describe_df = describe_df.select(
    describe_df['summary'],
    *[col(c).cast(DecimalType(10, 2)).alias(c) for c in describe_df.columns if c != 'summary']
)

In [13]:
describe_df.show()

+-------+-------+-------+-------+-------+-------+-----------+---------+
|summary|   Date|   Open|   High|    Low|  Close|     Volume|Adj Close|
+-------+-------+-------+-------+-------+-------+-----------+---------+
|  count|1258.00|1258.00|1258.00|1258.00|1258.00|    1258.00|  1258.00|
|   mean|   NULL|  72.36|  72.84|  71.92|  72.39| 8222093.48|    67.24|
| stddev|   NULL|   6.77|   6.77|   6.74|   6.76| 4519780.84|     6.72|
|    min|   NULL|  56.39|  57.06|  56.30|  56.42|10010500.00|    50.36|
|    max|   NULL|  90.80|  90.97|  89.25|  90.47| 9994400.00|    84.91|
+-------+-------+-------+-------+-------+-------+-----------+---------+



In [14]:
df_with_hv_ratio = df.withColumn("HV Ratio", col("High") / col("Volume"))

In [15]:
df_with_hv_ratio.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV Ratio|
+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|   

In [16]:
peak_high_day=df.sort(df.High.desc()).collect()


In [17]:
peak_high_day[1]

Row(Date='2015-01-08', Open='89.209999', High='90.66999799999999', Low='89.07', Close='90.470001', Volume='12713600', Adj Close='84.91421600000001')

In [18]:
from pyspark.sql.functions import mean,max,min


In [19]:
mean_high = df.agg(mean("Close")).first()


In [20]:
mean_high[0]

72.38844998012726

In [21]:
volume_stats = df.agg(max("Volume").alias("max_volume"), min("Volume").alias("min_volume")).collect()


In [22]:
max_volume = volume_stats[0]['max_volume']
min_volume = volume_stats[0]['min_volume']

In [26]:
min_volume

'10010500'

In [24]:
df.sort(df.Volume.desc()).collect()[1]

Row(Date='2016-08-12', Open='73.800003', High='74.120003', Low='73.559998', Close='73.889999', Volume='9994200', Adj Close='72.82940500000001')

In [25]:
close_below_60 = df.filter(df["Close"] < 60)



In [27]:
close_below_60.count()

81

In [28]:
days_high_greater_than_80 = df.filter(df["High"] > 80).count()

# Count the total number of days
total_days = df.count()

# Calculate the percentage
percentage_high_greater_than_80 = (days_high_greater_than_80 / total_days) * 100


In [29]:
percentage_high_greater_than_80

8.426073131955485

In [30]:
from pyspark.sql.types import DoubleType

In [31]:
df = df.withColumn("High", col("High").cast(DoubleType()))
df = df.withColumn("Volume", col("Volume").cast(DoubleType()))

In [32]:

correlation = df.corr("High", "Volume")

In [33]:
correlation

-0.3384326061737161

In [34]:
from pyspark.sql.functions import year

In [35]:
df = df.withColumn("Date", col("Date").cast("date"))


In [37]:
df = df.withColumn("Year", year("Date"))


In [38]:
max_high_per_year = df.groupBy("Year").agg(max("High").alias("Max_High"))


In [39]:
max_high_per_year.show()

+----+---------+
|Year| Max_High|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [40]:
from pyspark.sql.functions import month

In [41]:
df = df.withColumn("Date", col("Date").cast("date"))
df = df.withColumn("month", year("Date"))


In [42]:
Avg_Close= df.groupBy("month").agg(max("Close").alias("Avg_Close"))


In [43]:
avg_close_per_month = Avg_Close.orderBy("month")

In [44]:
Avg_Close.show()

+-----+---------+
|month|Avg_Close|
+-----+---------+
| 2012|77.150002|
| 2013|81.209999|
| 2014|87.540001|
| 2015|90.470001|
| 2016|74.300003|
+-----+---------+

