In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=ec14d61aef503e302f08da711d5282121df24bff672c5134d7c3a28ffd71b960
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [14]:
import os
import time
import shutil
import zipfile
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

In [15]:
spark = SparkSession.builder.appName("StreamingModelApp").getOrCreate()

# Определение схемы данных
schema = StructType([
    StructField('battery_power', IntegerType(), True),
    StructField('blue', IntegerType(), True),
    StructField('clock_speed', FloatType(), True),
    StructField('dual_sim', IntegerType(), True),
    StructField('fc', IntegerType(), True),
    StructField('four_g', IntegerType(), True),
    StructField('int_memory', IntegerType(), True),
    StructField('m_dep', FloatType(), True),
    StructField('mobile_wt', IntegerType(), True),
    StructField('n_cores', IntegerType(), True),
    StructField('pc', IntegerType(), True),
    StructField('px_height', IntegerType(), True),
    StructField('px_width', IntegerType(), True),
    StructField('ram', IntegerType(), True),
    StructField('sc_h', IntegerType(), True),
    StructField('sc_w', IntegerType(), True),
    StructField('talk_time', IntegerType(), True),
    StructField('three_g', IntegerType(), True),
    StructField('touch_screen', IntegerType(), True),
    StructField('wifi', IntegerType(), True),
    StructField('price_range', IntegerType(), True)
])

In [16]:
data_path = "train.csv"

df = spark.read.format("csv").schema(schema).option("header", "true").load(data_path)


train_df, test_df, test2_df = df.randomSplit([0.5, 0.3, 0.2], seed=42)


In [17]:
streaming_path = "streaming_data"
if os.path.exists(streaming_path):
    shutil.rmtree(streaming_path)
os.makedirs(streaming_path, exist_ok=True)

def save_test_data_in_batches(test2_df, batch_size, output_dir):
    test2_df = test2_df.cache()
    total_rows = test2_df.count()
    num_batches = (total_rows // batch_size) + 1
    for i in range(num_batches):
        batch_df = test2_df.limit(batch_size)
        batch_file_path = os.path.join(output_dir, f"batch_{i}")
        batch_df.write.csv(batch_file_path, mode="overwrite", header=True)
        test2_df = test2_df.subtract(batch_df)

save_test_data_in_batches(test2_df, batch_size=100, output_dir=streaming_path)


In [18]:
model_zip_path = 'lr_model.zip'
unzipped_model_path = 'unzipped_model'

# Распаковка модели
if os.path.exists(unzipped_model_path):
    shutil.rmtree(unzipped_model_path)
os.makedirs(unzipped_model_path, exist_ok=True)

with zipfile.ZipFile(model_zip_path, 'r') as zip_ref:
    zip_ref.extractall(unzipped_model_path)


In [19]:
model_path = os.path.join(unzipped_model_path, "lr_model")
model = PipelineModel.load(model_path)

streaming_df = spark.readStream.format("csv").schema(schema).option("header", "true").load(streaming_path)

predictions = model.transform(streaming_df)


In [20]:
query = predictions.writeStream \
    .queryName("predictions") \
    .outputMode("append") \
    .format("memory") \
    .start()

def add_new_files_from_batch(batch_dir, destination):
    for file_name in os.listdir(batch_dir):
        full_file_name = os.path.join(batch_dir, file_name)
        if os.path.isfile(full_file_name):
            shutil.copy(full_file_name, destination)



In [None]:
batch_dirs = sorted(os.listdir(streaming_path))
for batch_dir in batch_dirs:
    add_new_files_from_batch(os.path.join(streaming_path, batch_dir), streaming_path)
    time.sleep(10)  # Ждем 10 секунд перед добавлением нового файла


for _ in range(5):
    spark.sql("SELECT  features, prediction FROM predictions").show(5, truncate=False)
    time.sleep(7)

query.awaitTermination()

+-----------------------------------------------------------------------------------------------------------------------------+----------+
|features                                                                                                                     |prediction|
+-----------------------------------------------------------------------------------------------------------------------------+----------+
|[502.0,0.0,0.800000011920929,0.0,7.0,0.0,52.0,1.0,82.0,6.0,8.0,281.0,1159.0,2666.0,5.0,4.0,20.0,1.0,1.0,0.0]                 |1.0       |
|[503.0,1.0,1.7999999523162842,1.0,1.0,1.0,13.0,0.699999988079071,131.0,1.0,4.0,1495.0,1688.0,3117.0,19.0,6.0,9.0,1.0,0.0,1.0]|3.0       |
|[504.0,1.0,0.5,1.0,2.0,1.0,46.0,0.8999999761581421,172.0,5.0,14.0,280.0,1795.0,2085.0,13.0,5.0,8.0,1.0,0.0,0.0]              |1.0       |
|[507.0,1.0,0.5,1.0,1.0,0.0,32.0,0.5,141.0,7.0,11.0,936.0,1398.0,1702.0,17.0,0.0,5.0,1.0,1.0,1.0]                             |1.0       |
|[510.0,0.0,1.7000000476837