In [1]:
import pandas as pd

In [2]:
import findspark
findspark.init()
findspark.find()

'D:\\Foundation for CS\\spark\\spark-3.5.0-bin-hadoop3'

In [3]:
from pyspark.sql import SparkSession

In [4]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Finance Data with PySpark") \
    .getOrCreate()

# Read data from the json line file obtained from 
data_spark = spark.read.json('PriceHistory.jsonl')

# Show the DataFrame schema and sample data
data_spark.printSchema()
data_spark.show() 

root
 |-- GiaCaoNhat: double (nullable = true)
 |-- GiaDieuChinh: double (nullable = true)
 |-- GiaDongCua: double (nullable = true)
 |-- GiaMoCua: double (nullable = true)
 |-- GiaThapNhat: double (nullable = true)
 |-- GiaTriKhopLenh: long (nullable = true)
 |-- GtThoaThuan: long (nullable = true)
 |-- KLThoaThuan: long (nullable = true)
 |-- KhoiLuongKhopLenh: long (nullable = true)
 |-- ThayDoi: string (nullable = true)
 |-- date: string (nullable = true)

+----------+------------+----------+--------+-----------+--------------+------------+-----------+-----------------+--------------+----------+
|GiaCaoNhat|GiaDieuChinh|GiaDongCua|GiaMoCua|GiaThapNhat|GiaTriKhopLenh| GtThoaThuan|KLThoaThuan|KhoiLuongKhopLenh|       ThayDoi|      date|
+----------+------------+----------+--------+-----------+--------------+------------+-----------+-----------------+--------------+----------+
|      18.8|       18.75|     18.75|    18.7|       18.6|   41730000000|           0|          0|          22

In [5]:
from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql.functions import udf, month, dayofweek
from pyspark.sql.types import DateType

# Setting a user defined function:
# This function converts the string cell into a date:
func =  udf (lambda x: datetime.strptime(x, '%d/%m/%Y'), DateType())
data_spark = data_spark.withColumn('date', func(col('date')))
data_spark.printSchema()

root
 |-- GiaCaoNhat: double (nullable = true)
 |-- GiaDieuChinh: double (nullable = true)
 |-- GiaDongCua: double (nullable = true)
 |-- GiaMoCua: double (nullable = true)
 |-- GiaThapNhat: double (nullable = true)
 |-- GiaTriKhopLenh: long (nullable = true)
 |-- GtThoaThuan: long (nullable = true)
 |-- KLThoaThuan: long (nullable = true)
 |-- KhoiLuongKhopLenh: long (nullable = true)
 |-- ThayDoi: string (nullable = true)
 |-- date: date (nullable = true)



In [6]:
data_spark.describe().show()

+-------+------------------+------------------+------------------+-----------------+------------------+--------------------+--------------------+------------------+------------------+--------------+
|summary|        GiaCaoNhat|      GiaDieuChinh|        GiaDongCua|         GiaMoCua|       GiaThapNhat|      GiaTriKhopLenh|         GtThoaThuan|       KLThoaThuan| KhoiLuongKhopLenh|       ThayDoi|
+-------+------------------+------------------+------------------+-----------------+------------------+--------------------+--------------------+------------------+------------------+--------------+
|  count|              1479|              1479|              1479|             1479|              1479|                1479|                1479|              1479|              1479|          1479|
|   mean|27.474171737660587|13.179154834347544|27.123394185260306|27.11663286004058|26.714604462474632|7.962319958755916E10|2.429093607649087E10|1032499.5490196078|2956606.3488843814|          NULL|
| std

In [7]:
print(data_spark.orderBy(data_spark["GiaCaoNhat"].desc()).head(1)[0][10]) # date
print(data_spark.orderBy(data_spark["GiaCaoNhat"].desc()).head(1)[0][0]) # price

2018-04-18
52.4


In [8]:
#Giá trị lợi nhuận thấp nhất
data_spark = data_spark.withColumn("Return", data_spark["GiaDongCua"]-data_spark["GiaMoCua"])
data_spark.select('Return').describe().show()

+-------+--------------------+
|summary|              Return|
+-------+--------------------+
|  count|                1479|
|   mean|0.006761325219743...|
| stddev|  0.5944843382137692|
|    min| -2.8999999999999986|
|    max|  3.1999999999999957|
+-------+--------------------+



In [9]:
#Tỷ lệ lợi nhuận thấp nhất
data_spark = data_spark.withColumn("ReturnPCT", (data_spark["Return"]/data_spark["GiaDongCua"])*100)
data_spark.select('ReturnPCT').describe().show()

+-------+--------------------+
|summary|           ReturnPCT|
+-------+--------------------+
|  count|                1479|
|   mean|-0.00185971527951...|
| stddev|  2.0408231111659716|
|    min|  -9.407665505226479|
|    max|   9.417040358744401|
+-------+--------------------+



In [10]:
print(data_spark.orderBy(data_spark["ReturnPCT"].asc()).head(1)[0][10]) # date
print(data_spark.orderBy(data_spark["ReturnPCT"].asc()).head(1)[0][12]) # returnPCT


2022-11-07
-9.407665505226479


In [11]:
#Tháng nào có trung bình daily return cao nhất?
data_spark = data_spark.withColumn('date_string', data_spark['date'].cast('string'))

data_spark.groupBy(month("date_string").alias("month")).agg({'Return': 'mean'}).orderBy('avg(Return)').collect()

[Row(month=4, avg(Return)=-0.06271186440677946),
 Row(month=10, avg(Return)=-0.058712121212121146),
 Row(month=12, avg(Return)=-0.01853448275862066),
 Row(month=9, avg(Return)=-0.014705882352941117),
 Row(month=7, avg(Return)=-0.014015151515151257),
 Row(month=11, avg(Return)=-0.010465116279069585),
 Row(month=3, avg(Return)=0.003358208955223902),
 Row(month=6, avg(Return)=0.02054263565891458),
 Row(month=2, avg(Return)=0.03118811881188114),
 Row(month=5, avg(Return)=0.0560483870967741),
 Row(month=8, avg(Return)=0.06231343283582091),
 Row(month=1, avg(Return)=0.09639639639639648)]

In [13]:
#Ngày nào trong tuần có trung bình giá trị lợi nhuận cao nhất
data_spark = data_spark.withColumn("DayOfWeek", dayofweek(data_spark['date']))
data_spark.groupBy('DayOfWeek').agg({'Return': 'mean'}).orderBy('avg(Return)').collect()

[Row(DayOfWeek=2, avg(Return)=-0.0860344827586207),
 Row(DayOfWeek=5, avg(Return)=-0.03394648829431419),
 Row(DayOfWeek=3, avg(Return)=0.006081081081081138),
 Row(DayOfWeek=4, avg(Return)=0.05219594594594593),
 Row(DayOfWeek=6, avg(Return)=0.09345637583892623)]

In [15]:
data_spark.printSchema()

root
 |-- GiaCaoNhat: double (nullable = true)
 |-- GiaDieuChinh: double (nullable = true)
 |-- GiaDongCua: double (nullable = true)
 |-- GiaMoCua: double (nullable = true)
 |-- GiaThapNhat: double (nullable = true)
 |-- GiaTriKhopLenh: long (nullable = true)
 |-- GtThoaThuan: long (nullable = true)
 |-- KLThoaThuan: long (nullable = true)
 |-- KhoiLuongKhopLenh: long (nullable = true)
 |-- ThayDoi: string (nullable = true)
 |-- date: date (nullable = true)
 |-- Return: double (nullable = true)
 |-- ReturnPCT: double (nullable = true)
 |-- date_string: string (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DayTrading: double (nullable = true)



In [18]:
#Ngày có dao động giao dịch trong ngày (Day trading) thấp nhất
day_trading_formula = 100 * (col('GiaCaoNhat') - col('GiaThapNhat')) / col('GiaDongCua')
data_spark = data_spark.withColumn("DayTrading", day_trading_formula)

#data_spark.select(['High','Low','Close','DayTrading']).show()
print(data_spark.orderBy(data_spark["DayTrading"].asc()).head(1)[0][10]) # date
print(data_spark.orderBy(data_spark["DayTrading"].asc()).head(1)[0][15]) # returnPCT

2019-01-10
0.5172413793103522


In [20]:
# Moving avg: đường nối tất cả những giá đóng cửa trung bình trong 50 ngày gần nhất
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
windowSpec = Window.orderBy(col("date")).rowsBetween(-50, 0)

data_spark = data_spark.withColumn('50DMA', avg("GiaDongCua").over(windowSpec)) 

In [21]:
#Độ lệch so với đường
from pyspark.sql.functions import log 
deviation = log(col('GiaDongCua')/col('50DMA'))*100
data_spark = data_spark.withColumn("50_DMA_DEV", deviation)
data_spark.select(['date','GiaDongCua','50DMA','50_DMA_DEV']).show()

+----------+----------+------------------+-------------------+
|      date|GiaDongCua|             50DMA|         50_DMA_DEV|
+----------+----------+------------------+-------------------+
|2018-01-05|      39.6|              39.6|                0.0|
|2018-01-08|      39.7|39.650000000000006|0.12602396122877732|
|2018-01-09|     39.75| 39.68333333333334|0.16785568355787414|
|2018-01-10|      40.3|39.837500000000006|   1.15427892091823|
|2018-01-11|      42.5|40.370000000000005| 5.1417141065521506|
|2018-01-12|     45.45| 41.21666666666667|   9.77701153241724|
|2018-01-15|      45.5| 41.82857142857143|   8.41326929250616|
|2018-01-16|     44.95|          42.21875|  6.268632575715697|
|2018-01-17|      42.7| 42.27222222222222|  1.006873494826658|
|2018-01-18|      44.0|            42.445|  3.598051372832993|
|2018-01-19|      45.2|42.695454545454545|  5.700462319603171|
|2018-01-22|      45.3|42.912499999999994|  5.414387370517787|
|2018-01-25|      47.8| 43.28846153846153|  9.913951717

In [23]:
#Ngày có lệch nhiều nhất
print(data_spark.orderBy(data_spark["50_DMA_DEV"].desc()).head(1)[0][10]) # date
print(data_spark.orderBy(data_spark["50_DMA_DEV"].desc()).head(1)[0][17]) # dev 

2020-06-03
27.49892043323719


In [24]:
#Tháng giao dịch sôi động nhất, nhiều khối lượng khớp lệnh nhất
data_spark.groupBy(month("date_string").alias("month")).agg({'KhoiLuongKhopLenh': 'mean'}).orderBy('avg(KhoiLuongKhopLenh)').collect()

[Row(month=7, avg(KhoiLuongKhopLenh)=1967880.8333333333),
 Row(month=8, avg(KhoiLuongKhopLenh)=2045878.880597015),
 Row(month=6, avg(KhoiLuongKhopLenh)=2219920.465116279),
 Row(month=9, avg(KhoiLuongKhopLenh)=2656072.5210084035),
 Row(month=5, avg(KhoiLuongKhopLenh)=2737692.8225806453),
 Row(month=3, avg(KhoiLuongKhopLenh)=2870759.925373134),
 Row(month=4, avg(KhoiLuongKhopLenh)=2923839.1525423727),
 Row(month=10, avg(KhoiLuongKhopLenh)=3002650.5303030303),
 Row(month=2, avg(KhoiLuongKhopLenh)=3067721.089108911),
 Row(month=12, avg(KhoiLuongKhopLenh)=3606062.8448275863),
 Row(month=1, avg(KhoiLuongKhopLenh)=4019057.927927928),
 Row(month=11, avg(KhoiLuongKhopLenh)=4625528.682170543)]

In [25]:
# các giá trị sàn của mỗi năm
from pyspark.sql.functions import year
data_spark.groupBy(year("date_string").alias("Year")).agg({'GiaCaoNhat': 'max'}).orderBy('max(GiaCaoNhat)').collect()


[Row(Year=2023, max(GiaCaoNhat)=20.1),
 Row(Year=2020, max(GiaCaoNhat)=32.0),
 Row(Year=2019, max(GiaCaoNhat)=32.35),
 Row(Year=2022, max(GiaCaoNhat)=32.5),
 Row(Year=2021, max(GiaCaoNhat)=37.9),
 Row(Year=2018, max(GiaCaoNhat)=52.4)]