# Import libraries

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Linear Regression Model").getOrCreate()

FileNotFoundError: [WinError 2] The system cannot find the file specified

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import IndexToString, StringIndexer

# Load and verify data

In [2]:
data = spark.read.csv('ASRI.csv',header = True, inferSchema = True)

NameError: name 'spark' is not defined

In [4]:
data.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: integer (nullable = true)
 |-- High: integer (nullable = true)
 |-- Low: integer (nullable = true)
 |-- Close: integer (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Frequency: double (nullable = true)



In [5]:
data.head(3)

[Row(Date=datetime.date(2020, 1, 2), Open=0, High=240, Low=236, Close=238, Volume='3.323.600', Value='789.137.000', Frequency=647.0),
 Row(Date=datetime.date(2020, 1, 3), Open=0, High=240, Low=236, Close=236, Volume='15.815.400', Value='3.751.160.600', Frequency=1.071),
 Row(Date=datetime.date(2020, 1, 6), Open=0, High=236, Low=232, Close=234, Volume='3.246.300', Value='759.807.800', Frequency=400.0)]

In [6]:
for item in data.head(1)[0]:
    print(item)

2020-01-02
0
240
236
238
3.323.600
789.137.000
647.0


In [7]:
data.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Value', 'Frequency']

In [8]:
data.groupBy('Close').count().show()

+-----+-----+
|Close|count|
+-----+-----+
|  133|    3|
|  108|    3|
|  155|   16|
|  193|    4|
|  101|    1|
|  115|    5|
|  126|    3|
|  210|    2|
|  183|   19|
|  192|    3|
|  159|   13|
|  236|   14|
|  222|    1|
|  128|    7|
|  230|    2|
|  122|    2|
|  232|    5|
|  157|   12|
|  190|   10|
|  246|    4|
+-----+-----+
only showing top 20 rows



# Data Preprocessing

In [9]:
indexer_volume = StringIndexer(inputCol='Volume', outputCol='VolumeIndex')
indexer_value = StringIndexer(inputCol='Value', outputCol='ValueIndex')

In [None]:
# Fitting dan transforming DataFrame
indexed = indexer_volume.fit(data).transform(data)
indexed = indexer_value.fit(indexed).transform(indexed)

In [None]:
indexed.show()

In [None]:
indexed.printSchema()

In [None]:
assembler = VectorAssembler(inputCols =['Open', 'High', 'Low', 'Close','Frequency'],
                            outputCol='features')

In [None]:
output = assembler.transform(indexed)

In [None]:
output.printSchema()

In [None]:
print(output.features)

In [None]:
output.head(1)

In [None]:
final_data = output.select('features','Close')

In [None]:
final_data.show()

# Train Test split

In [None]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [None]:
train_data.describe().show()

In [None]:
test_data.describe().show()

# Build Model

In [None]:
regressor = LinearRegression(labelCol='Close')

In [None]:
model = regressor.fit(train_data)

# Evaluate Model

In [None]:
pred_data = model.evaluate(test_data)

In [None]:
pred_data.residuals.show()

In [None]:
pred_data.rootMeanSquaredError

In [None]:
pred_data.r2

In [None]:
pred_data.meanSquaredError

In [None]:
pred_data.meanAbsoluteError

In [None]:
from pyspark.sql import functions as f
data.select(f.corr('Close', 'Low')).show()

In [None]:
unlabeled_data = test_data.select('features')
test_predictions = model.transform(unlabeled_data)

In [None]:
test_predictions.show()

# Visualisasi Saham

In [None]:
import matplotlib.pyplot as plt
# Mendapatkan prediksi dari test data
predictions = test_predictions.select('prediction').collect()
actual_values = test_data.select('Close').collect()

# Mengonversi hasil prediksi dan nilai aktual ke dalam list
predicted_values = [row['prediction'] for row in predictions]
actual_values_list = [row['Close'] for row in actual_values]

# Membuat line chart
plt.figure(figsize=(10, 6))

# Plot nilai aktual
plt.plot(actual_values_list, label='Actual', marker='o', linestyle='-', color='blue')

# Plot hasil prediksi
plt.plot(predicted_values, label='Predicted', marker='x', linestyle='--', color='red')

# Menambahkan label sumbu x dan y, judul, dan legend
plt.xlabel('Data Points')
plt.ylabel('Price')
plt.title('Line Chart Actual And Predicted Close Prices')
plt.legend()
plt.grid(True)
plt.tight_layout()

# Menampilkan line chart
plt.show()

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Mengonversi DataFrame Spark menjadi Pandas DataFrame
pandas_df = data.toPandas()

# Mengatur kolom 'Date' sebagai index dan mengonversi ke tipe datetime
pandas_df['Date'] = pd.to_datetime(pandas_df['Date'])
pandas_df.set_index('Date', inplace=True)

# Membuat candlestick chart menggunakan matplotlib
plt.figure(figsize=(10, 6))
plt.title('Candlestick Chart')
plt.xlabel('Date')
plt.ylabel('Price')

# Plotting candlestick chart
plt.plot(pandas_df.index, pandas_df['Open'], color='green', marker='o', linestyle='-')
plt.plot(pandas_df.index, pandas_df['Close'], color='red', marker='o', linestyle='-')
plt.plot(pandas_df.index, pandas_df['High'], color='blue', linestyle='--')
plt.plot(pandas_df.index, pandas_df['Low'], color='orange', linestyle='--')

# Menampilkan chart
plt.legend(['Open', 'Close', 'High', 'Low'], loc='upper left')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np 
from pyspark.sql import functions as F

# Assuming 'date' and 'close' are the respective column names in your Spark DataFrame
data = data.withColumn('date', F.to_date('date', 'yyyy-MM-dd'))

# Convert Spark DataFrame to Pandas DataFrame for visualization
data_pd = data.select('date', 'close').toPandas()

# Sort the data by date if it's not sorted already
data_pd.sort_values('date', inplace=True)

# Extracting x and y values for plotting
x = data_pd['date']
y = data_pd['close']

# Plot the stock price data
plt.figure(figsize=(10, 6))
plt.scatter(x, y, label='Close Price')

# Fit a trendline (linear regression) using numpy.polyfit
z = np.polyfit(x.index.values, y, 1)
p = np.poly1d(z)
plt.plot(x, p(x.index.values), color='red', label='Trendline')

plt.title('Close Price with Trendline')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)  # Rotate x-axis labels for better visibility
plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Memperoleh prediksi dari data uji
predictions = test_predictions.select('prediction').toPandas()

# Menambahkan kolom index untuk digunakan sebagai label pada bar chart
predictions['index'] = predictions.index

# Menampilkan bar chart
plt.figure(figsize=(10, 6))
plt.bar(predictions['index'], predictions['prediction'], color='blue')
plt.xlabel('Index')
plt.ylabel('Predicted Close Price')
plt.title('Predicted Close Price Bar Chart')
plt.show()


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

final_data = output.select('features', 'Close').toPandas()
# Menambahkan kolom moving averages ke dalam Pandas DataFrame
final_data['MA_50'] = final_data['Close'].rolling(window=50).mean()
final_data['MA_200'] = final_data['Close'].rolling(window=200).mean()

# Membuat visualisasi line chart
plt.figure(figsize=(12, 8))

# Plot harga penutupan
plt.plot(final_data['Close'], label='Close Price', color='blue')

# Plot moving averages
plt.plot(final_data['MA_50'], label='MA 50 Days', color='orange')
plt.plot(final_data['MA_200'], label='MA 200 Days', color='green')

plt.title('Line Chart with Moving Averages')
plt.xlabel('Days')
plt.ylabel('Price')
plt.legend()
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Lakukan prediksi pada data uji
predictions = model.transform(test_data)

# Ambil kolom Close dan hasil prediksi untuk membandingkannya
close_values = predictions.select('Close').collect()
predicted_values = predictions.select('prediction').collect()

# Ubah hasil prediksi ke dalam list untuk plot
close_values = [row.Close for row in close_values]
predicted_values = [row.prediction for row in predicted_values]

# Buat plot
plt.figure(figsize=(10, 6))
plt.plot(close_values, label='Actual Close')
plt.plot(predicted_values, label='Predicted Close')
plt.title('Comparison between Actual and Predicted Close Values')
plt.xlabel('Index')
plt.ylabel('Close Value')
plt.legend()
plt.show()
