#### Objective:
- use the kernel from `Anaconda` to use `pyspark` package
- EDA with spark on compressed file

- Repartitioning the spark dataframe:
    - If plan to write this DataFrame to disk, especially as Parquet files, repartitioning can help control the number and size of output files.

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
## schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

* install parquet library for pandas

In [2]:
# pip install pyspark

In [3]:
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext

In [4]:
# pip install fastparquet

#### Create Spark context with `SparkConf`

In [5]:
cluster_url= 'local[*]'
# create config file
config = SparkConf()\
    .setAppName('readAWSCrawlData')\
    .setMaster(cluster_url)

In [6]:
sc = SparkContext(conf = config)

## 2. Another way of creating Spark Session

=>
    config= SparkConf()\
        .setAppName('readAWSCrawlData')\
        .setMaster('local[*]')


spark = SparkSession \
    .builder \
    .config(conf= config)\
    .getOrCreate()


### Read the compressed file 

In [7]:
# df_aws_zipped= sc.read\
#     .format("csv")\
#     .option("compression", "gzip")\
#     .option("header", True)\
#     .load("data_sources/AWS_web_crawl/cdx-00005.gz")

### Read with spark session

In [8]:
config= SparkConf()\
    .setAppName('readAWSCrawlData')\
    .setMaster('local[*]')


spark = SparkSession \
    .builder \
    .config(conf= config)\
    .getOrCreate()

#### Path to GZIP files

In [9]:
path= "data/CC-MAIN-2024-33/cdx-00001.gz"

path_2= "data/CC-MAIN-2024-33/cdx-00008.gz"

In [10]:
# df_zipped= spark.read\
#     .format("csv")\
#     .option("compression", "gzip")\
#     .option("header", False)\
#     .load("data_sources/AWS_web_crawl/cdx-00005.gz")


df_zipped= spark.read\
    .format("csv")\
    .option("compression", "gzip")\
    .option("header", False)\
    .load(path_2)

In [11]:
df_zipped.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [12]:
df_zipped.rdd.getNumPartitions()

1

In [13]:
df_zipped.show(2)

+---+--------------------+--------------------+--------------------+----------------+--------------------+----------------+--------------------+--------------------+--------------------+
|_c0|                 _c1|                 _c2|                 _c3|             _c4|                 _c5|             _c6|                 _c7|                 _c8|                 _c9|
+---+--------------------+--------------------+--------------------+----------------+--------------------+----------------+--------------------+--------------------+--------------------+
| be|trois-frontieres)...| "mime": "text/html"| "mime-detected":...| "status": "302"| "digest": "ZO5BO...| "length": "695"| "offset": "26539...| "filename": "cra...| "redirect": "htt...|
| be|trois-frontieres)...| "mime": "text/html"| "mime-detected":...| "status": "302"| "digest": "ZO5BO...| "length": "690"| "offset": "25627...| "filename": "cra...| "redirect": "htt...|
+---+--------------------+--------------------+------------------

## Change the Column Names

In [14]:
## shcema
df_zipped_ = df_zipped\
    .withColumn("c1_", F.col("_c0").cast(StringType())).drop("_c0")\
    .withColumn("c2_", F.col("_c1").cast(StringType())).drop("_c1")\
    .withColumn("url", F.col("_c2").cast(StringType())).drop("_c2")\
    .withColumn("mime", F.col("_c3").cast(StringType())).drop("_c3")\
    .withColumn("mime_type", F.col("_c4").cast(StringType())).drop("_c4")\
    .withColumn("status", F.col("_c5").cast(StringType())).drop("_c5")\
    .withColumn("digest_offset", F.col("_c6").cast(StringType())).drop("_c6")\
    .withColumn("length", F.col("_c7").cast(StringType())).drop("_c7")\
    .withColumn("offset", F.col("_c8").cast(StringType())).drop("_c8")\
    .withColumn("filename", F.col("_c9").cast(StringType())).drop("_c9")\
    # .withColumn("content_charset", F.col("_c10").cast(StringType())).drop("_c10")\
    # .withColumn("language", F.col("_c11").cast(StringType())).drop("_c11")


In [15]:
df_zipped_.show(2)

+---+--------------------+--------------------+--------------------+----------------+--------------------+----------------+--------------------+--------------------+--------------------+
|c1_|                 c2_|                 url|                mime|       mime_type|              status|   digest_offset|              length|              offset|            filename|
+---+--------------------+--------------------+--------------------+----------------+--------------------+----------------+--------------------+--------------------+--------------------+
| be|trois-frontieres)...| "mime": "text/html"| "mime-detected":...| "status": "302"| "digest": "ZO5BO...| "length": "695"| "offset": "26539...| "filename": "cra...| "redirect": "htt...|
| be|trois-frontieres)...| "mime": "text/html"| "mime-detected":...| "status": "302"| "digest": "ZO5BO...| "length": "690"| "offset": "25627...| "filename": "cra...| "redirect": "htt...|
+---+--------------------+--------------------+------------------

##### Save as Parquet format

In [16]:
df_zipped_.rdd.getNumPartitions()

1

In [17]:
df_zipped_=df_zipped_.repartition(20)

In [18]:
df_zipped_.rdd.getNumPartitions()

20

In [None]:
%%time
df_zipped_.write.parquet("2024_00008.parquet")

In [None]:
# %%time
# df_zipped_.write.mode("overwrite").parquet("data/2024_00008.parquet")

In [18]:
df_zipped.select(df_zipped.columns[:3]).take(15)

[Row(_c0='be', _c1='trois-frontieres)/f/n/plans_cartes.php 20240812104747 {"url": "https://www.trois-frontieres.be/F/N/plans_cartes.php"', _c2=' "mime": "text/html"'),
 Row(_c0='be', _c1='trois-frontieres)/f/n/sources.php 20240812110044 {"url": "https://www.trois-frontieres.be/F/N/sources.php"', _c2=' "mime": "text/html"'),
 Row(_c0='be', _c1='trois-frontieres)/f/n/trains.php 20240812112818 {"url": "https://www.trois-frontieres.be/F/N/trains.php"', _c2=' "mime": "text/html"'),
 Row(_c0='be', _c1='trois-frontieres)/f/n/viaduc.php 20240812100754 {"url": "https://www.trois-frontieres.be/F/N/viaduc.php"', _c2=' "mime": "text/html"'),
 Row(_c0='be', _c1='trois-frontieres)/f/n/welkenraedt_tout_savoir.php 20240812103816 {"url": "https://www.trois-frontieres.be/F/N/welkenraedt_tout_savoir.php"', _c2=' "mime": "text/html"'),
 Row(_c0='be', _c1='trois-frontieres)/f/nom_fam_prof.php 20240812004417 {"url": "https://www.trois-frontieres.be/F/nom_fam_prof.php"', _c2=' "mime": "text/html"'),
 Row(_c0

In [17]:
df_zipped_.count()

9858933

### Partition
- To allow all the spark executors to process on sub-folders, rather handling one single Large file
- It is a lazy function

### NOTE: parition number
- For 4 Cores, starting with 16-32 paritions and adjusting based on the actual performance of `Spark Jobs`

In [13]:
# df_zipped= df_zipped.repartition(4)

#### Write the partitioned file to dir.

In [14]:
# %%time
# df_zipped.write.parquet("data_sources/aws_crawl_00005/")

In [15]:
# df_zipped.rdd.getNumPartitions()

### Perform Substring of the URL column

#### Custom function
- user-defined function to split the column `url`

In [16]:
## funciton to perform substring on Full URL path
def process_url(df):

    ## replace the NaN with NONE value
    # df= df.dropna()

    #df["url"]= df["url"].replace({ np.nan: 0})

    ## list of substrings
    ip_path= []
    resource_path= []
    time_stamp = []
    url= []

    
    # iterate over each row
    for j in range(0, len(df)):

#        if pd.notna(df["url"][j]):

        # split the url
        full_url= df["url"][j].split(' ')        
            
        # Get full path
        ip_resource= full_url[0].split(")")
        
        # get IP path
        ip_path.append(ip_resource[0])
        # Get Resource path
        resource_path.append(ip_resource[1])
    
        # get timestamp
        time_stamp.append(full_url[1])
        # Get the full URL
        url.append(full_url[3].replace('"', ''))

            
    df_new= df.copy()

    # Append the values
    df_new["Ip_path"]= ip_path
    df_new["Resource_path"]= resource_path
    df_new["TimeStamp"]= time_stamp
    df_new["URL"] = url

    ## drop URL column
    df_new= df_new.drop(columns= ["url"])

    return df_new

### Partitioning
Partitioning data in Apache Spark is a fundamental concept that plays a crucial role in the performance and efficiency of your Spark applications. Here’s a detailed explanation of what partitioning does and why it is important:

### What Partitioning Does

Partitioning in Spark refers to dividing a large dataset into smaller chunks, or partitions, which can be processed in parallel across the nodes of a cluster. Each partition is a logical division of the data that allows Spark to distribute and process the data concurrently.

### Importance of Partitioning

1. **Parallelism**:
   - **Improved Parallel Processing**: Partitioning enables parallel processing by allowing multiple tasks to work on different parts of the data simultaneously. This can significantly reduce the overall processing time.
   - **Optimal Resource Utilization**: By distributing tasks across available cores and nodes, Spark can better utilize the cluster's computational resources.

2. **Efficiency**:
   - **Data Locality**: Partitions can be processed on the node where the data resides, reducing the need for data transfer over the network and thus enhancing performance.
   - **Balanced Workload**: Proper partitioning ensures that the workload is evenly distributed across the cluster, preventing some nodes from being overloaded while others are idle.

3. **Memory Management**:
   - **Avoiding Out of Memory Errors**: Smaller partitions are easier to fit into memory, reducing the likelihood of out-of-memory errors and enabling more efficient garbage collection.

4. **Fault Tolerance**:
   - **Resilience to Failures**: If a node fails, only the tasks related to the partitions on that node need to be re-executed, rather than the entire job. This makes Spark applications more resilient to hardware failures.

5. **Scalability**:
   - **Handling Large Datasets**: Partitioning allows Spark to handle large datasets by breaking them into manageable chunks. This scalability is essential for big data processing.

### How Partitioning Works in Spark

- **Default Partitioning**: When you create an RDD or DataFrame, Spark automatically partitions it based on default settings or the source data's natural partitions (e.g., HDFS blocks).
- **Custom Partitioning**: You can explicitly control the number of partitions using operations like `repartition`, `coalesce`, and during data loading with options like `numPartitions`.

### Key Partitioning Operations

1. **repartition**:
   - **Description**: Increases or decreases the number of partitions and involves a full shuffle of the data, redistributing it across the nodes.
   - **Use Case**: When you need to increase parallelism or balance the data more evenly before a shuffle-intensive operation.

2. **coalesce**:
   - **Description**: Reduces the number of partitions without a full shuffle, typically by merging partitions within the same node.
   - **Use Case**: When you want to decrease the number of partitions for efficiency, often used after filtering operations to reduce the number of partitions.

3. **partitionBy** (for DataFrames):
   - **Description**: Specifies a column to partition the data by when writing it out to a file system.
   - **Use Case**: Optimizes data layout for future reads, especially for queries that filter by the partitioned column.


### Read the Parition the data

### Schema
- assign column names when reading file

In [17]:
## Assign column name
new_col_names= ["c1_", "c2_", "url", "mime", "mime_type", "status", "digest_offset", "length", "offset", "filename",
                "content_charset", "language"]

# ## shcema
# schema_ = StructType([
#     StructField("c1_", StringType(), True),\
#     StructField("c2_", StringType(), True),\
#     StructField("url", StringType(), True), \
#     StructField("mime", StringType(), True), \
#     StructField("mime_type", StringType(), True), \
#     StructField("status", StringType(), True), \
#     StructField("digest_offset", StringType(), True), \
#     StructField("length", StringType(), True), \
#     StructField("offset", StringType(), True), \
#     StructField("filename", StringType(), True), \
#     StructField("content_charset", StringType(), True), \
#     StructField("language", StringType(), False)
# ])

In [18]:
%%time
## infer schema for the column header and data type
## currently string type for all columns only
#    .schema(schema_)\

df_new= spark.read.format("parquet")\
    .load("data_sources/aws_crawl_00005/part-00000-4878556a-7ef5-4524-b862-7535a49b03be-c000.snappy.parquet")


CPU times: user 4.15 ms, sys: 19 µs, total: 4.17 ms
Wall time: 983 ms


In [19]:
df_new= df_new\
    .withColumn("c1_", F.col("_c0").cast(StringType())).drop("_c0")\
    .withColumn("c2_", F.col("_c1").cast(StringType())).drop("_c1")\
    .withColumn("url", F.col("_c2").cast(StringType())).drop("_c2")\
    .withColumn("mime", F.col("_c3").cast(StringType())).drop("_c3")\
    .withColumn("mime_type", F.col("_c4").cast(StringType())).drop("_c4")\
    .withColumn("status", F.col("_c5").cast(StringType())).drop("_c5")\
    .withColumn("digest_offset", F.col("_c6").cast(StringType())).drop("_c6")\
    .withColumn("length", F.col("_c7").cast(StringType())).drop("_c7")\
    .withColumn("offset", F.col("_c8").cast(StringType())).drop("_c8")\
    .withColumn("filename", F.col("_c9").cast(StringType())).drop("_c9")\
    .withColumn("content_charset", F.col("_c10").cast(StringType())).drop("_c10")\
    .withColumn("language", F.col("_c11").cast(StringType())).drop("_c11")

In [20]:
df_new.printSchema()

root
 |-- c1_: string (nullable = true)
 |-- c2_: string (nullable = true)
 |-- url: string (nullable = true)
 |-- mime: string (nullable = true)
 |-- mime_type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- digest_offset: string (nullable = true)
 |-- length: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- content_charset: string (nullable = true)
 |-- language: string (nullable = true)



In [21]:
df_new.show(2)

+---+---+--------------------+--------------------+--------------------+----------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+
|c1_|c2_|                 url|                mime|           mime_type|          status|       digest_offset|             length|              offset|            filename|    content_charset|            language|
+---+---+--------------------+--------------------+--------------------+----------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+
| au|com|syc)/syc-adopt-pa...| "mime": "text/html"| "mime-detected":...| "status": "200"| "digest": "3V3JO...|  "length": "19362"| "offset": "63222...| "filename": "cra...| "charset": "UTF-8"| "languages": "eng"}|
| au|com|terryflynnbooks)/...| "mime": "text/html"| "mime-detected":...| "status": "200"| "digest": "SGOZL...| "length": "170686"| "offset": "11

In [22]:
df_new.count()

4051896

In [23]:
df_new.rdd.getNumPartitions()

5

### User-Defined Function
- Register the `udf` to use with column 

In [24]:
# First, Split by space
first_split= F.split(df_new["url"], " ")
df_new = df_new.withColumn("ip_and_resource", first_split.getItem(0))\
        .withColumn("time_stamp", first_split.getItem(1))\
        .withColumn("URL", first_split.getItem(3))

In [25]:
df_new.printSchema()

root
 |-- c1_: string (nullable = true)
 |-- c2_: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- mime: string (nullable = true)
 |-- mime_type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- digest_offset: string (nullable = true)
 |-- length: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- content_charset: string (nullable = true)
 |-- language: string (nullable = true)
 |-- ip_and_resource: string (nullable = true)
 |-- time_stamp: string (nullable = true)



In [26]:
df_new.show(2)

+---+---+--------------------+--------------------+--------------------+----------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------+
|c1_|c2_|                 URL|                mime|           mime_type|          status|       digest_offset|             length|              offset|            filename|    content_charset|            language|     ip_and_resource|    time_stamp|
+---+---+--------------------+--------------------+--------------------+----------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------+
| au|com|"https://syc.com....| "mime": "text/html"| "mime-detected":...| "status": "200"| "digest": "3V3JO...|  "length": "19362"| "offset": "63222...| "filename": "cra...| "charset": "UTF-8"| "languages": "eng"}|syc)/syc-adopt-pa...|20231204203310|


#### Second split; on ip and resource

In [27]:
split_col= F.split(df_new['ip_and_resource'], "\)")
df_new = df_new.withColumn('ip_path', split_col.getItem(0))\
            .withColumn('resource', split_col.getItem(1))

# convert the time stamp
#df_new= df_new.withColumn('DateTime', F.to_timestamp(F.col('time_stamp'), "yyyyMMddHHmmss")).show(2)

In [28]:
## Drop the columns
#df_new= df_new.drop(*['ip_and_resource', 'time_stamp'])
df_new.show(2)

+---+---+--------------------+--------------------+--------------------+----------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------+---------------+--------------------+
|c1_|c2_|                 URL|                mime|           mime_type|          status|       digest_offset|             length|              offset|            filename|    content_charset|            language|     ip_and_resource|    time_stamp|        ip_path|            resource|
+---+---+--------------------+--------------------+--------------------+----------------+--------------------+-------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------+---------------+--------------------+
| au|com|"https://syc.com....| "mime": "text/html"| "mime-detected":...| "status": "200"| "digest": "3V3JO...|  "length": "19362"| "offset"

In [30]:
df_new.rdd.getNumPartitions()

5

#### NOTE: Saves files in different folder as the data has been paritioned
- 

In [39]:
# df_new.write.csv('data_sources/aws_crawl_00005/first_part_csv')

<pyspark.sql.readwriter.DataFrameWriter at 0x7df45acb2650>

In [42]:
## using coalase

df_new.repartition(1)\
    .write.format("data_sources/aws_crawl_00005/first_par_csv")\
    .option("header", "true")\
    .save("first_.csv")

Py4JJavaError: An error occurred while calling o384.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: data_sources/aws_crawl_00005/first_par_csv. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: data_sources/aws_crawl_00005/first_par_csv.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


In [33]:
## Read the file

In [41]:
%%time
df_parq= pd.read_csv("data_sources/aws_crawl_00005/first_part_csv/part-00004-9c5d555c-6feb-4672-a481-a67c0bf94d0a-c000.csv")

ParserError: Error tokenizing data. C error: Expected 16 fields in line 3803, saw 17
