In [14]:
!pip install pyspark

Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.7


In [1]:
from pyspark.sql import SparkSession

# Create a Spark session with event log configuration
spark = SparkSession.builder \
    .appName("SparkExample") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "file:/home/jovyan/work/logs") \
    .getOrCreate()

# Specify the IP address of your Namenode
hadoop_namenode_address = "namenode"

# Specify the path to your CSV file on HDFS
csv_path = f"hdfs://{hadoop_namenode_address}:9000/tmp/stockprice/"

# Read the CSV file into a PySpark DataFrame
df_from_hdfs = spark.read.csv(csv_path, header=True, inferSchema=True)

# Show the DataFrame
df_from_hdfs.show(5)

+------------+------+-------------+------------+-------+----+----+---------+-------------+
|ClosingPrice|Change|TradingVolume|TradingValue|Opening|High| Low|MarketCap|FullMarketCap|
+------------+------+-------------+------------+-------+----+----+---------+-------------+
|        7280|     0|         4520|    32820940|   7240|7280|7240|   633165|      3480379|
|        7280|     0|         1697|    12353920|   7280|7300|7260|   633165|      3480379|
|        7280|    20|         2390|    17416620|   7300|7300|7280|   633165|      3480379|
|        7300|    20|         4511|    32848700|   7280|7300|7260|   634904|      3489941|
|        7280|    20|         2623|    19134420|   7280|7300|7280|   633165|      3480379|
+------------+------+-------------+------------+-------+----+----+---------+-------------+
only showing top 5 rows



In [2]:
# Count the number of rows
num_rows = df_from_hdfs.count()

# Get the number of columns
num_cols = len(df_from_hdfs.columns)

# Print the shape of the DataFrame
print("Shape of the DataFrame: ({}, {})".format(num_rows, num_cols))

Shape of the DataFrame: (1215, 9)


In [3]:
# Print the schema of the DataFrame
df_from_hdfs.printSchema()

root
 |-- ClosingPrice: integer (nullable = true)
 |-- Change: integer (nullable = true)
 |-- TradingVolume: integer (nullable = true)
 |-- TradingValue: long (nullable = true)
 |-- Opening: integer (nullable = true)
 |-- High: integer (nullable = true)
 |-- Low: integer (nullable = true)
 |-- MarketCap: integer (nullable = true)
 |-- FullMarketCap: integer (nullable = true)



In [4]:
from sklearn.svm import SVR
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Select features and target column
feature_columns = df_from_hdfs.columns[1:]  # Exclude the first column (Date)
target_column = "ClosingPrice"

# Create a vector assembler
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Define your SVM model with hyperparameters
svm_model = SVR(kernel='linear', C=10, gamma='scale')

# Wrap your SVM model with PySpark's LinearSVR
svm = LinearRegression(featuresCol="features", labelCol=target_column, maxIter=100)

# Create a pipeline
pipeline = Pipeline(stages=[vector_assembler, svm])

# Split the data into training and testing sets (80% train, 20% test)
train_data, test_data = df_from_hdfs.randomSplit([0.8, 0.2], seed=123)

# Train the SVM model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data:", rmse)

Root Mean Squared Error (RMSE) on test data: 0.0005838904298389542
