In [None]:
# Cai dat thu vien pyspark
!pip install pyspark



In [None]:
# truy cap drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession

# Khởi tạo một SparkSession
spark = SparkSession.builder \
    .appName("DistributedQueryExample") \
    .getOrCreate()

In [None]:
# Đọc dữ liệu từ file CSV và tạo DataFrame
file_path = '/content/drive/MyDrive/Du_lieu/output.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [None]:
# Đăng ký DataFrame như một bảng tạm thời để sử dụng với Spark SQL
df.createOrReplaceTempView("orders")
output = spark.sql("SELECT * FROM orders")
output.show() # show hết

+---+-----+-------+-----+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+
|cut|color|clarity|price|              carat|               depth|               table|                  x|                  y|                  z|
+---+-----+-------+-----+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+
|4.0|  5.0|    1.0|  326|-1.1993754756261739|-0.17525447896316393| -1.1047442052867857|-1.5926160012053432| -1.578906966589539| -1.605760724167961|
|3.0|  5.0|    2.0|  326|-1.2416468161691938| -1.3730226804225496|  1.5922470814822693|-1.6462568948132152|-1.7049807814310218|-1.7794544223523872|
|1.0|  5.0|    4.0|  327|-1.1993754756261739| -3.4162743182062028|  3.3902412726616395|  -1.50321451185889|-1.4978595141914426|-1.7794544223523872|
|3.0|  1.0|    3.0|  334|-1.0725614539971142| 0.45885809828003815| 0.24375143809774188|-1.3691122778392095| -1.3

In [None]:
# truy theo dieu kien
# SQL đơn giản: SELECT FROM WHERE
# result : Bảng dữ liệu
#Lọc các dòng có price lớn hơn 1000 và carat nhỏ hơn 1, sau đó sắp xếp theo price giảm dần và carat tăng dần:
result1 = df.filter("price > 1000 AND carat < 1") \
            .select("cut", "color", "carat", "price") \
            .orderBy(df['price'].desc(), df['carat'].asc())
result1.show()
#Lọc các dòng có carat lớn hơn 1
result2 = df.filter("carat > 1").select("cut", "color", "carat", "price")
result2.show()
#Lọc các dòng có price nhỏ hơn 5000 và sắp xếp theo giá trị price giảm dần
result3 = df.filter("price < 5000").select("cut", "color", "carat", "price").orderBy(df['price'].desc())
result3.show()
# Nhóm theo cut và color, sau đó tính tổng carat của từng nhóm
result4 = df.filter("price > 1000").select("cut", "color", "carat").groupby("cut", "color").agg({'carat': 'sum'})
result4.show()
#Kết nối hai DataFrame (result2 và result3) dựa trên cột cut
result5 = result2.join(result3, on='cut', how='inner')
result5.show()


+---+-----+-------------------+-----+
|cut|color|              carat|price|
+---+-----+-------------------+-----+
|2.0|  6.0| 0.5126138163661329|18542|
|3.0|  6.0| 0.5760208271806628|18279|
|2.0|  6.0| 0.6182921677236828|18231|
|2.0|  6.0| 0.5760208271806628|18114|
|2.0|  6.0| 0.7239705190812322|18112|
|2.0|  6.0| 0.5760208271806628|17909|
|4.0|  6.0| 0.4914781460946229|17590|
|1.0|  6.0| 0.7239705190812322|17499|
|4.0|  5.0| 0.8719202109818021|17353|
|3.0|  6.0| 0.8719202109818021|17192|
|2.0|  6.0|0.47034247582311295|17100|
|4.0|  6.0| 0.5760208271806628|17042|
|4.0|  5.0|  0.956462892067842|16823|
|4.0|  6.0|  0.428071135280093|16469|
|1.0|  6.0| 0.6182921677236828|16406|
|4.0|  5.0| 0.8507845407102921|16256|
|4.0|  6.0|  0.914191551524822|16253|
|3.0|  6.0|  0.449206805551603|16234|
|4.0|  6.0| 0.8507845407102921|16130|
|2.0|  6.0|  0.449206805551603|16087|
+---+-----+-------------------+-----+
only showing top 20 rows

+---+-----+------------------+-----+
|cut|color|             c

LẬP BẢNG MÔ TẢ

In [None]:
# thư viện thêm
from pyspark.sql.functions import *
from pyspark.sql.types import NumericType
from pyspark.sql import Row

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import NumericType
from pyspark.sql import Row


# Kiểm tra nếu SparkContext đã được khởi tạo
if 'spark' not in globals():
    spark = SparkSession.builder.appName("YourAppName").getOrCreate()

# Lấy danh sách các cột số
numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]

# Tạo list để lưu trữ các Row
rows = []

for col in numeric_cols:
    try:
        # Tính toán thống kê cho từng cột và sử dụng show thay vì collect
        kq = df.select(
            F.max(col).alias('max'),
            F.min(col).alias('min'),
            F.stddev(col).alias('std'),
            F.variance(col).alias('var'),
            F.percentile_approx(col, 0.25, 10000000).alias('Q1'),
            F.percentile_approx(col, 0.5).alias('Q2'),
            F.percentile_approx(col, 0.75).alias('Q3'),
            F.mean(col).alias('mean'),        # Trung bình
            F.skewness(col).alias('skewness'), # Độ lệch
            F.kurtosis(col).alias('kurtosis'), # Độ nhọn
            F.count(col).alias('count')       # Số lượng giá trị
        ).head()  # Chỉ lấy một dòng

        # Kiểm tra nếu có kết quả
        if kq:
            kq_dict = kq.asDict()

            # Chuyển tất cả giá trị sang kiểu float (hoặc double) để tránh lỗi kiểu dữ liệu
            rows.append(Row(feature='max', value=float(kq_dict['max']), column=col))
            rows.append(Row(feature='min', value=float(kq_dict['min']), column=col))
            rows.append(Row(feature='std', value=float(kq_dict['std']), column=col))
            rows.append(Row(feature='var', value=float(kq_dict['var']), column=col))
            rows.append(Row(feature='Q1', value=float(kq_dict['Q1']), column=col))
            rows.append(Row(feature='Q2', value=float(kq_dict['Q2']), column=col))
            rows.append(Row(feature='Q3', value=float(kq_dict['Q3']), column=col))
            rows.append(Row(feature='mean', value=float(kq_dict['mean']), column=col))         # Trung bình
            rows.append(Row(feature='skewness', value=float(kq_dict['skewness']), column=col))  # Độ lệch
            rows.append(Row(feature='kurtosis', value=float(kq_dict['kurtosis']), column=col))  # Độ nhọn
            rows.append(Row(feature='count', value=float(kq_dict['count']), column=col))        # Số lượng
    except Exception as e:
        print(f"Đã xảy ra lỗi khi xử lý cột {col}: {e}")

# Tạo DataFrame từ list các Row
summary_df = spark.createDataFrame(rows)

# Kiểm tra schema của DataFrame
summary_df.printSchema()

# Pivot bảng để có dạng mong muốn
summary_df = summary_df.groupBy("feature").pivot("column").agg(F.first("value"))

# Hiển thị kết quả
summary_df.show()


# Dừng SparkSession khi hoàn tất
spark.stop()

root
 |-- feature: string (nullable = true)
 |-- value: double (nullable = true)
 |-- column: string (nullable = true)

+--------+--------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| feature|               carat|            clarity|              color|                cut|               depth|               price|               table|                   x|                   y|                   z|
+--------+--------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     std|  1.0000093005519834| 1.6468205220747467|  1.701268865131659|  1.115817054691567|  1.0000093005519868|   3985.851342539849|  1.0000093005519923|   1.000009300551988|  1.0000093005519948|  1.0000093005519914|
|skewnes

 Linear Regression với Spark MLlib


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator



# Kiểm tra nếu SparkSession đã bị dừng, tạo lại SparkSession
try:
    spark = SparkSession.builder.getOrCreate()
except Exception as e:
    print("Error in getting Spark session:", e)
    spark = SparkSession.builder.master("local[*]").appName("Diamond Price Prediction").getOrCreate()

# Giả sử df là DataFrame của bạn
# df = spark.read.csv("diamond_data.csv", header=True, inferSchema=True)

# Cập nhật cột đặc trưng và mục tiêu
feature_cols = ['carat', 'depth', 'table', 'x', 'y', 'z', 'cut', 'color', 'clarity']  # Cột đặc trưng
label_col = 'price'  # Cột mục tiêu (label)

# Bước 1: Kiểm tra và loại bỏ cột "features" nếu có
if "features" in df.columns:
    df = df.drop("features")

# Tạo cột features bằng cách sử dụng VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Bước 2: Chia dữ liệu thành tập huấn luyện và kiểm tra
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

# Bước 3: Tạo mô hình Linear Regression
lr = LinearRegression(featuresCol="features", labelCol=label_col)

# Bước 4: Huấn luyện mô hình
lr_model = lr.fit(train_data)

# Bước 5: Dự đoán trên dữ liệu kiểm tra
predictions = lr_model.transform(test_data)

# Bước 6: Hiển thị kết quả dự đoán
predictions.select("features", label_col, "prediction").show()

# Bước 7: Đánh giá mô hình (MSE, RMSE, R2)
evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) trên tập kiểm tra: {rmse}")

# Đánh giá R2
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"R2 Score trên tập kiểm tra: {r2}")

# Dừng SparkSession khi hoàn tất
spark.stop()


+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|[0.11103608120744...| 1334| 757.6712346208718|
|[2.18233176781541...| 5083|  9437.06602470076|
|[0.49147814609462...| 3149|2854.8686370758055|
|[0.59715649745217...| 3302| 3238.888763303011|
|[0.72397051908123...| 3403| 4123.020433563479|
|[0.87192021098180...| 3984| 4494.561300401276|
|[0.66056350826670...| 4056|4089.9085515238557|
|[3.64069301654960...|15776|16919.505192659486|
|[0.42807113528009...| 3427| 3234.471768950449|
|[0.70283484880972...| 4123| 4660.252558288186|
|[1.16781959478294...| 5524|6662.6403754325365|
|[2.14006042727239...| 8362| 11578.08992464833|
|[2.54163816243108...|11189|12131.525369138835|
|[2.56277383270259...|11212|12863.659403610525|
|[-0.3962200053087...| 1174|156.91987459473796|
|[0.23785010283650...| 2854|3011.0031182608172|
|[0.44920680555160...| 3886|4051.5032236771626|
|[0.59715649745217...| 3980| 4508.845160

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Tạo hoặc khởi động SparkSession
try:
    spark = SparkSession.builder.getOrCreate()
except Exception as e:
    print("Error in getting Spark session:", e)
    spark = SparkSession.builder.master("local[*]").appName("Diamond Price Prediction").getOrCreate()

# Đọc dữ liệu
# Giả sử bạn đã có DataFrame df
# df = spark.read.csv("diamond_data.csv", header=True, inferSchema=True)

# Hiển thị thông tin dữ liệu
df.printSchema()
df.show(5)

# Kiểm tra các cột phân loại và số
categorical_cols = ['cut', 'color', 'clarity']  # Cột phân loại
numeric_cols = ['carat', 'depth', 'table', 'x', 'y', 'z']  # Cột số
label_col = 'price'  # Cột mục tiêu

# Kiểm tra và chuyển đổi kiểu dữ liệu của cột mục tiêu
df = df.withColumn(label_col, col(label_col).cast("double"))

# Xử lý cột phân loại bằng StringIndexer
for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index")
    df = indexer.fit(df).transform(df)

# Cập nhật danh sách cột đặc trưng sau khi mã hóa
indexed_cols = [f"{col_name}_index" for col_name in categorical_cols]
feature_cols = numeric_cols + indexed_cols

# Loại bỏ giá trị null
df = df.na.drop(subset=feature_cols + [label_col])

# Tạo cột features bằng VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Chia dữ liệu thành tập huấn luyện và kiểm tra
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

# Kiểm tra số lượng dòng trong các tập dữ liệu
print(f"Số dòng tập huấn luyện: {train_data.count()}")
print(f"Số dòng tập kiểm tra: {test_data.count()}")

# Tạo mô hình Linear Regression
lr = LinearRegression(featuresCol="features", labelCol=label_col)

# Huấn luyện mô hình
lr_model = lr.fit(train_data)

# Dự đoán trên tập kiểm tra
predictions = lr_model.transform(test_data)

# Hiển thị một số dự đoán
predictions.select("features", label_col, "prediction").show(5)

# Đánh giá mô hình
evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction")

# Tính RMSE
rmse = evaluator.setMetricName("rmse").evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Tính R2
r2 = evaluator.setMetricName("r2").evaluate(predictions)
print(f"R2 Score: {r2}")

# Dừng SparkSession khi hoàn tất
spark.stop()


root
 |-- cut: double (nullable = true)
 |-- color: double (nullable = true)
 |-- clarity: double (nullable = true)
 |-- price: integer (nullable = true)
 |-- carat: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

+---+-----+-------+-----+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+
|cut|color|clarity|price|              carat|               depth|              table|                  x|                  y|                  z|
+---+-----+-------+-----+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+
|4.0|  5.0|    1.0|  326|-1.1993754756261739|-0.17525447896316393|-1.1047442052867857|-1.5926160012053432| -1.578906966589539| -1.605760724167961|
|3.0|  5.0|    2.0|  326|-1.2416468161691938| -

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Tạo SparkSession
spark = SparkSession.builder.master("local[*]").appName("Diamond Price Prediction").getOrCreate()

# Giả sử bạn đã có DataFrame df chứa dữ liệu
# df = spark.read.csv("diamond_data.csv", header=True, inferSchema=True)

# Danh sách các cột đặc trưng và cột mục tiêu
feature_cols = ['carat', 'depth', 'table', 'x', 'y', 'z']  # Các cột đặc trưng phải là số
label_col = 'price'  # Cột mục tiêu

# Tiền xử lý: Chuyển đổi các cột đặc trưng thành một cột vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Chia dữ liệu thành tập huấn luyện và kiểm tra
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

# Tạo mô hình Linear Regression
lr = LinearRegression(featuresCol="features", labelCol=label_col)

# Huấn luyện mô hình
lr_model = lr.fit(train_data)

# Dự đoán trên tập kiểm tra
predictions = lr_model.transform(test_data)

# Tính toán và in kết quả MSE
mse_evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="mse")
mse = mse_evaluator.evaluate(predictions)
print(f"Mean Squared Error (MSE): {mse}")

# Tính toán và in kết quả RMSE
rmse_evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Tính toán và in kết quả R2
r2_evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R2 Score: {r2}")

# Dừng SparkSession
spark.stop()


Mean Squared Error (MSE): 2066654.9010969105
Root Mean Squared Error (RMSE): 1437.5864847364526
R2 Score: 0.8688474180116172
