Based on [WafaStudies](https://www.youtube.com/@WafaStudies) PySpark [tutorial](https://www.youtube.com/playlist?list=PLMWaZteqtEaJFiJ2FyIKK0YEuXwQ9YIS_).

## Imports

In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip -q install findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder\
                    .appName('Spark')\
                    .master("local[*]")\
                    .getOrCreate()

from pyspark.sql import dataframe

## Writing dataframe to CSV files

In [5]:
help(dataframe.DataFrame.write)

Help on property:

    Interface for saving the content of the non-streaming :class:`DataFrame` out into external
    storage.
    
    .. versionadded:: 1.4.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Returns
    -------
    :class:`DataFrameWriter`
    
    Examples
    --------
    >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
    >>> type(df.write)
    <class '...readwriter.DataFrameWriter'>
    
    Write the DataFrame as a table.
    
    >>> _ = spark.sql("DROP TABLE IF EXISTS tab2")
    >>> df.write.saveAsTable("tab2")
    >>> _ = spark.sql("DROP TABLE tab2")



Let's start creating a dataframe

In [6]:
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])

df.show()
df.printSchema()

+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
help(df.write.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path: str, mode: Optional[str] = None, compression: Optional[str] = None, sep: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, header: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, escapeQuotes: Union[bool, str, NoneType] = None, quoteAll: Union[bool, str, NoneType] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, charToEscapeQuoteEscaping: Optional[str] = None, encoding: Optional[str] = None, emptyValue: Optional[str] = None, lineSep: Optional[str] = None) -> None method of pyspark.sql.readwriter.DataFrameWriter instance
    Saves the content of the :class:`DataFrame` in CSV format at the specified path.
    
    .. versionadded:: 2.0.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
 

In [8]:
df.write\
.csv("df_csv")

Let's check the written file:

In [9]:
spark.read.schema(df.schema)\
.format("csv")\
.load("df_csv").show()

+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+



What if we want to change the dataframe?

In [10]:
df = spark.createDataFrame([(1, "Goku"), (2, "Naruto")], schema=["id", "name"])

df.show()
df.printSchema()

+---+------+
| id|  name|
+---+------+
|  1|  Goku|
|  2|Naruto|
+---+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [11]:
df.write\
.csv("df_csv")

AnalysisException: ignored

It will give us an error because the file already exists, so we need to overwrite it:

In [12]:
df.write\
.csv("df_csv", mode="overwrite")

Let's check:

In [13]:
spark.read.schema(df.schema)\
.format("csv")\
.load("df_csv").show()

+---+------+
| id|  name|
+---+------+
|  2|Naruto|
|  1|  Goku|
+---+------+



And how to add more items to the file?

Let's create another dataframe:

In [14]:
df2 = spark.createDataFrame([("3", "Gojo"), ("4", "Kirito")], schema=["id", "name"])

df2.show()
df2.printSchema()

+---+------+
| id|  name|
+---+------+
|  3|  Gojo|
|  4|Kirito|
+---+------+

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



Then we have to append it to the file:

In [15]:
df2.write\
.csv("df_csv", mode="append")

In [16]:
spark.read.schema(df.schema)\
.format("csv")\
.load("df_csv").show()

+---+------+
| id|  name|
+---+------+
|  2|Naruto|
|  4|Kirito|
|  1|  Goku|
|  3|  Gojo|
+---+------+



### File structure:

Let's check the csv file structure:

In [17]:
!file -b df_csv

directory


It's a folder, let's check it's content:

In [18]:
!ls -la df_csv

total 44
drwxr-xr-x 2 root root 4096 Oct  9 11:20 .
drwxr-xr-x 1 root root 4096 Oct  9 11:20 ..
-rw-r--r-- 1 root root    7 Oct  9 11:20 part-00000-3c472559-2b7c-40b9-97ca-2c678d1510a1-c000.csv
-rw-r--r-- 1 root root   12 Oct  9 11:20 .part-00000-3c472559-2b7c-40b9-97ca-2c678d1510a1-c000.csv.crc
-rw-r--r-- 1 root root    7 Oct  9 11:20 part-00000-acb262e5-9c95-4ef3-8542-ac136f339622-c000.csv
-rw-r--r-- 1 root root   12 Oct  9 11:20 .part-00000-acb262e5-9c95-4ef3-8542-ac136f339622-c000.csv.crc
-rw-r--r-- 1 root root    9 Oct  9 11:20 part-00001-3c472559-2b7c-40b9-97ca-2c678d1510a1-c000.csv
-rw-r--r-- 1 root root   12 Oct  9 11:20 .part-00001-3c472559-2b7c-40b9-97ca-2c678d1510a1-c000.csv.crc
-rw-r--r-- 1 root root    9 Oct  9 11:20 part-00001-acb262e5-9c95-4ef3-8542-ac136f339622-c000.csv
-rw-r--r-- 1 root root   12 Oct  9 11:20 .part-00001-acb262e5-9c95-4ef3-8542-ac136f339622-c000.csv.crc
-rw-r--r-- 1 root root    0 Oct  9 11:20 _SUCCESS
-rw-r--r-- 1 root root    8 Oct  9 11:20 ._SUCCESS

It's divided in partitions

The number of partitions is the same number of rows we have on ```df```

This happens because spark have a ```driver node``` that divide the workload between ```worker nodes```, like:

      Driver Node
     /   |    |   \
    W1   W2   W3   W4


We can also specify the number of partitions we want:

In [19]:
help(df.repartition)

Help on method repartition in module pyspark.sql.dataframe:

repartition(numPartitions: Union[int, ForwardRef('ColumnOrName')], *cols: 'ColumnOrName') -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
    Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
    resulting :class:`DataFrame` is hash partitioned.
    
    .. versionadded:: 1.3.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    numPartitions : int
        can be an int to specify the target number of partitions or a Column.
        If it is a Column, it will be used as the first partitioning column. If not specified,
        the default number of partitions is used.
    cols : str or :class:`Column`
        partitioning columns.
    
        .. versionchanged:: 1.6.0
           Added optional arguments to specify the partitioning columns. Also made numPartitions
           optional if partitioning columns are specifi

In [20]:
df = spark.createDataFrame([(1, "Goku"), (2, "Naruto")], schema=["id", "name"])

df.show()
df.printSchema()

+---+------+
| id|  name|
+---+------+
|  1|  Goku|
|  2|Naruto|
+---+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [21]:
df_1part = df.repartition(1)
df_1part.rdd.getNumPartitions()

1

In [22]:
df_1part\
.write\
.csv("df_csv_1part", mode="overwrite", header=True)

In [23]:
spark.read\
.option("header", True)\
.format("csv")\
.load("df_csv_1part").show()

+---+------+
| id|  name|
+---+------+
|  1|  Goku|
|  2|Naruto|
+---+------+



In [24]:
!ls -la df_csv_1part/

total 20
drwxr-xr-x 2 root root 4096 Oct  9 11:20 .
drwxr-xr-x 1 root root 4096 Oct  9 11:20 ..
-rw-r--r-- 1 root root   24 Oct  9 11:20 part-00000-c2d5b4b6-26cb-44db-94ff-cd659fe52fc8-c000.csv
-rw-r--r-- 1 root root   12 Oct  9 11:20 .part-00000-c2d5b4b6-26cb-44db-94ff-cd659fe52fc8-c000.csv.crc
-rw-r--r-- 1 root root    0 Oct  9 11:20 _SUCCESS
-rw-r--r-- 1 root root    8 Oct  9 11:20 ._SUCCESS.crc
