In [4]:
%load_ext kedro.extras.extensions.ipython

The kedro.extras.extensions.ipython extension is already loaded. To reload it, use:
  %reload_ext kedro.extras.extensions.ipython


In [5]:
%reload_kedro

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [7]:
spark = SparkSession.builder.getOrCreate()

In [5]:
schema = StructType([StructField("Ph1", StringType(), False),
                    StructField("Ph2", StringType(), False),
                    StructField("Ir1", StringType(), False),
                    StructField("Fo1", StringType(), False),
                    StructField("Fo2", StringType(), False),
                    StructField("Di3", StringType(), False),
                    StructField("Di4", StringType(), False),
                    StructField("Ph3", StringType(), False),
                    StructField("Ph4", StringType(), False),
                    StructField("Ph5", StringType(), False),
                    StructField("Ph6", StringType(), False),
                    StructField("Co1", StringType(), False),
                    StructField("Co2", StringType(), False),
                    StructField("Co3", StringType(), False),
                    StructField("So1", StringType(), False),
                    StructField("So2", StringType(), False),
                    StructField("Di1", StringType(), False),
                    StructField("Di2", StringType(), False),
                    StructField("Te1", StringType(), False),
                    StructField("Fo3", StringType(), False),
                    StructField("LR1", StringType(), False),
                    StructField("LR2", StringType(), False)])

In [6]:
file_pattern = "/home/sossa/raftel/data/01_raw/Aras/House_A/DAY_*.txt"

In [8]:
data_df = (
    spark.read.text(
        file_pattern
    ).rdd.map(
        lambda x: x[0].split(" ")
    ).toDF(
        schema=schema
    ).withColumn(
    "filename", F.input_file_name()
    ).withColumn(
        "day", F.regexp_extract("filename", r'DAY_(\d+)', 1).cast(IntegerType())
    ).drop(
        "filename"
    )
)

In [9]:
data_df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|Ph1|Ph2|Ir1|Fo1|Fo2|Di3|Di4|Ph3|Ph4|Ph5|Ph6|Co1|Co2|Co3|So1|So2|Di1|Di2|Te1|Fo3|LR1|LR2|day|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 12| 17| 16|
|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 12| 17| 16|
|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 12| 17| 16|
|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 12| 17| 16|
|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 12| 17| 16|
|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 12| 17| 16|
|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 12| 17| 16|
|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0

                                                                                

In [10]:
spark.read.text(
        file_pattern
).withColumn(
    "filename", input_file_name()
).withColumn(
    "day", regexp_extract("filename", r'DAY_(\d+)', 1).cast(IntegerType())
).show(truncate=False)

In [8]:
import pandas as pd

# Define the file names
file_names = ["DAY_{}".format(i) for i in range(1, 31)]

# Create an empty DataFrame to store the data
columns = [
    "Ph1",
    "Ph2",
    "Ir1",
    "Fo1",
    "Fo2",
    "Di3",
    "Di4",
    "Ph3",
    "Ph4",
    "Ph5",
    "Ph6",
    "Co1",
    "Co2",
    "Co3",
    "So1",
    "So2",
    "Di1",
    "Di2",
    "Te1",
    "Fo3",
    "LR1",
    "LR2"
]

df = pd.DataFrame(columns=columns)

# Load data from each file and append to the DataFrame
day = 1
for file_name in file_names:
    file_path = "/home/sossa/raftel/data/01_raw/Aras/House_A/{}.txt".format(file_name)  # assuming the files have a .txt extension
    temp_df = pd.read_csv(file_path, sep=' ', header=None, names=columns)
    temp_df['DAY'] = day
    df = pd.concat([df, temp_df], ignore_index=True)
    day = day + 1

# Print the resulting DataFrame
df['SEC'] = df.index
print(df)

        Ph1 Ph2 Ir1 Fo1 Fo2 Di3 Di4 Ph3 Ph4 Ph5  ... So1 So2 Di1 Di2 Te1 Fo3  \
0         0   0   0   0   0   0   0   0   0   0  ...   0   0   0   0   0   0   
1         0   0   0   0   0   0   0   0   0   0  ...   0   0   0   0   0   0   
2         0   0   0   0   0   0   0   0   0   0  ...   0   0   0   0   0   0   
3         0   0   0   0   0   0   0   0   0   0  ...   0   0   0   0   0   0   
4         0   0   0   0   0   0   0   0   0   0  ...   0   0   0   0   0   0   
...      ..  ..  ..  ..  ..  ..  ..  ..  ..  ..  ...  ..  ..  ..  ..  ..  ..   
2591995   0   0   0   0   1   0   0   0   0   0  ...   0   0   0   0   0   0   
2591996   0   0   0   0   1   0   0   0   0   0  ...   0   0   0   0   0   0   
2591997   0   0   0   0   1   0   0   0   0   0  ...   0   0   0   0   0   0   
2591998   0   0   0   0   1   0   0   0   0   0  ...   0   0   0   0   0   0   
2591999   0   0   0   0   1   0   0   0   0   0  ...   0   0   0   0   0   0   

        LR1 LR2   DAY      SEC  
0     

In [9]:
df_spark = spark.createDataFrame(df)

In [10]:
labels = df_spark.select("LR1", "LR2").distinct().withColumn("LABEL", F.monotonically_increasing_id())

In [11]:
df_spark = df_spark.join(
    labels,
    ["LR1", "LR2"],
    "LEFT"
).orderBy(
    "SEC"
)

In [None]:
df_spark.write.partitionBy("DAY").mode("append").format("delta").option("overwriteSchema", "true").save("/home/sossa/raftel/data/01_raw/aras_a")

In [16]:
df_spark.printSchema()

root
 |-- Ph1: long (nullable = true)
 |-- Ph2: long (nullable = true)
 |-- Ir1: long (nullable = true)
 |-- Fo1: long (nullable = true)
 |-- Fo2: long (nullable = true)
 |-- Di3: long (nullable = true)
 |-- Di4: long (nullable = true)
 |-- Ph3: long (nullable = true)
 |-- Ph4: long (nullable = true)
 |-- Ph5: long (nullable = true)
 |-- Ph6: long (nullable = true)
 |-- Co1: long (nullable = true)
 |-- Co2: long (nullable = true)
 |-- Co3: long (nullable = true)
 |-- So1: long (nullable = true)
 |-- So2: long (nullable = true)
 |-- Di1: long (nullable = true)
 |-- Di2: long (nullable = true)
 |-- Te1: long (nullable = true)
 |-- Fo3: long (nullable = true)
 |-- LR1: long (nullable = true)
 |-- LR2: long (nullable = true)
 |-- DAY: double (nullable = true)
 |-- SEC: long (nullable = true)
 |-- LABEL: integer (nullable = false)



In [12]:
catalog.save("aras_a@spark", df_spark)

24/01/30 13:52:26 WARN TaskSetManager: Stage 0 contains a task of very large size (12755 KiB). The maximum recommended task size is 1000 KiB.
24/01/30 13:52:28 WARN TaskSetManager: Stage 1 contains a task of very large size (12755 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [5]:
df_spark.count()

                                                                                

[1;36m2592000[0m

In [None]:
df_spark.write.parquet ("/home/sossa/raftel/data/01_raw/aras/A.parquet")