In [1]:
import findspark
findspark.init()
import pyspark
import random

from pyspark.sql import SparkSession

sc = pyspark.SparkContext(appName="File_Formats")
spark = SparkSession(sc)


# File formats

## Introduction

PySpark supports the following file formats:

__1. Text files__: PySpark allows you to read and write text files (e.g. CSV, TSV, JSON), which can be read using the SparkContext textFile() method.

__2. Sequence files__: A Sequence File is a binary format file containing key-value pairs. PySpark also allows reading and writing Sequence Files.

__3. Avro__: Avro is a serialized data format that also supports key-value pairs. PySpark also supports reading and writing Avro files.

__4. Parquet__: Parquet is a binary file format that specializes in column-based data storage. PySpark supports both reading and writing Parquet files.

__5. ORC__: ORC (Optimized Row Columnar) is another column-based data storage format. PySpark supports both reading and writing ORC files.

__6. Hadoop InputFormat__: PySpark also supports Hadoop InputFormat, which allows users to write their own input files for their desired file formats.



In addition, PySpark supports all Hadoop supported file formats such as HBase, Cassandra, MongoDB, etc.



## 1. Példafeladatok

### CSV

### Parameters to use when scanning:
    

path: path or pathname of the file(s) to be read. This can be the path to a single file or to an entire directory where the files are located.

sep or delimiter: The field delimiter character. The default value is a comma (,), but other characters can be used, such as a tab (\t).

header: Indicates whether the first line of the CSV file contains the column names. Default value is false.

inferSchema: Indicates whether Spark automatically guesses the field type. Default value is false.

quote: The character used by the fields in the CSV file that are delimited by quotation marks. Defaults to double quotes ("), but other characters can be used.

escape: The escape character to be used before quotation marks or characters with escape characters. The default is double quotes ("), but other characters may be used.

encoding: The character encoding of the file. The default is UTF-8, but other encodings may be used, for example ISO-8859-1.

comment: The character used for comments in the CSV file. The default value is null, which means that there is no comment.

nullValue: The string represented by the 'null' values used in the file. Default value is null.

In [2]:
%%sh
echo "%%sh - to indicate when you would run a shell script"

# Nem szükséges könyvtárak, fájlok törlése

rm -rf /home/student/random_csv
rm -rf /home/student/titanic_append_csv
rm -rf /home/student/titanic_parquet/
rm -rf /home/student/titanic_parquet_append/

echo
echo "A fájlformátumok anyag miatt esetlegesen már létrehozott teszt fájlok és mappák törölve"

Couldn't find program: 'sh'


In [2]:
# Scanning without parameterisation


# Let's read the CSV file
csv_file = spark.read.format("csv").load("titanic.csv")

csv_file.show()
csv_file.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|        _c0|     _c1|   _c2|                 _c3|   _c4| _c5|  _c6|  _c7|             _c8|    _c9| _c10|    _c11|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|      

In [5]:
# Scanning using a header


# Let's read the CSV file
csv_file = spark.read.format("csv").option("header", "true").load("titanic.csv")

csv_file.show()
csv_file.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [7]:
# Scanning using header and inferschema


# Let's read the CSV file
csv_file_with_schema = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("titanic.csv")

csv_file_with_schema.show()
csv_file_with_schema.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [3]:
# Scan using another delimiter


# Let's read the CSV file
csv_file = spark.read.format("csv").option("delimiter", ";").load("titanic.csv")

csv_file.show()
csv_file.printSchema()

+--------------------+
|                 _c0|
+--------------------+
|PassengerId,Survi...|
|1,0,3,"Braund, Mr...|
|2,1,1,"Cumings, M...|
|3,1,3,"Heikkinen,...|
|4,1,1,"Futrelle, ...|
|5,0,3,"Allen, Mr....|
|6,0,3,"Moran, Mr....|
|7,0,1,"McCarthy, ...|
|8,0,3,"Palsson, M...|
|9,1,3,"Johnson, M...|
|10,1,2,"Nasser, M...|
|11,1,3,"Sandstrom...|
|12,1,1,"Bonnell, ...|
|13,0,3,"Saunderco...|
|14,0,3,"Andersson...|
|15,0,3,"Vestrom, ...|
|16,1,2,"Hewlett, ...|
|17,0,3,"Rice, Mas...|
|18,1,2,"Williams,...|
|19,0,3,"Vander Pl...|
+--------------------+
only showing top 20 rows

root
 |-- _c0: string (nullable = true)



In [4]:
# Generate random row and write a new CSV file

from pyspark.sql.functions import rand

# DataFrame of 10 rows with random numbers
df = spark.range(0, 10).withColumn("value", rand())

df.write.format("csv").option("header", "true").save("random_csv")


AnalysisException: [PATH_ALREADY_EXISTS] Path file:/C:/Users/erick/Documents/Doctoral School/DataBases 2023/Jupyter spark/random_csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [5]:
# Random row generation and modify CSV file by rewriting

from pyspark.sql.functions import rand

# DataFrame of 10 rows with random numbers
df = spark.range(0, 10).withColumn("value", rand())

df.write.format("csv").option("header", "true").mode('overwrite').save("random_csv")

In [11]:
# Generate random row and modify CSV file with append

from pyspark.sql.functions import rand

# DataFrame of 10 rows with random numbers
df = spark.range(0, 10).withColumn("value", rand())

df.write.format("csv").option("header", "true").mode('append').save("random_csv")

In [12]:
# Scan existing data and then write a file with append

# load a data frame from file
df = spark.read.format("csv").load("titanic.csv")

df.write.format("csv").option("header", "true").mode('append').save("titanic_append_csv")

# Once you've run it a few times, it's worth checking the contents in the file manager



### JSON

In [13]:
# Scanning 


# Let's read the JSON file
json_file = spark.read.format("json").load("titanic.json")
json_file.show()
json_file.printSchema()

+---+-----+--------+-------+------------------+--------------+-----------+--------------------+------+------------------+--------+----------------+
|Age|Cabin|Embarked|   Fare|ParentsAndChildren|PassengerClass|PassengerId|       PassengerName|   Sex|SiblingsAndSpouses|Survived|          Ticket|
+---+-----+--------+-------+------------------+--------------+-----------+--------------------+------+------------------+--------+----------------+
| 22|     |       S|   7.25|                 0|             3|          1|Braund, Mr. Owen ...|  male|                 1|       0|       A/5 21171|
| 38|  C85|       C|71.2833|                 0|             1|          2|Cumings, Mrs. Joh...|female|                 1|       1|        PC 17599|
| 26|     |       S|  7.925|                 0|             3|          3|Heikkinen, Miss. ...|female|                 0|       1|STON/O2. 3101282|
| 35| C123|       S|   53.1|                 0|             1|          4|Futrelle, Mrs. Ja...|female|          

### Parquet

In [14]:
# Kiiratás
# Write the dataframe from the CSV to Parquet format

csv_file_with_schema.write.format("parquet").save("titanic_parquet/")

In [15]:
# Let's run it again

csv_file_with_schema.write.format("parquet").save("titanic_parquet/")

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/C:/Users/erick/Documents/Doctoral School/DataBases 2023/Jupyter spark/titanic_parquet already exists. Set mode as "overwrite" to overwrite the existing path.

In [16]:
# So we can overwrite

csv_file_with_schema.write.format("parquet").mode("overwrite").save("titanic_parquet/")

In [17]:
# How to add the data

csv_file_with_schema.write.format("parquet").mode("append").save("titanic_parquet_append/")

In [18]:
# Let's scan the data

df_parquet = spark.read.format("parquet").load("titanic_parquet")

df_parquet.show()
df_parquet.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

## 2. Practice

Extract the contents of the /home/student directory

In [6]:
%%

ls /home/student

UsageError: Cell magic `%%` not found.


### CSV

In [22]:
# Read the CSV file and display its contents 1.
csv_file = spark.read..load("titanic.csv")

csv_file.show()
csv_file.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|        _c0|     _c1|   _c2|                 _c3|   _c4| _c5|  _c6|  _c7|             _c8|    _c9| _c10|    _c11|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|      

In [23]:
# Read the CSV file and display its contents 2.
csv_file = 
csv_file.show()
csv_file.printSchema()

SyntaxError: invalid syntax (1198416730.py, line 2)

In [None]:
# Read the CSV file and display its contents 1. 

csv_file = spark.read.format("csv").load("titanic.csv")

csv_file.show()
csv_file.printSchema()

In [24]:
# Read the CSV file and display its contents: debug exercise 2: 
csv_file = spark.read.format("parquet").load("titanic_parquet")

csv_file.show()
csv_file.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [None]:
# Olvassuk be a CSV fájlt, és jelenítsük meg a tartalmát: Hibakeresés feladat 3: 
csv_file = spark.read.format("csv").save("titanic.csv")

csv_file.show()
csv_file.printSchema()

In [7]:
# Scanning using header and schema identification 1.

# Read the CSV file
csv_file_with_schema = spark.read.format("csv") \
.option("", "true") \
.option("", "true") \
.load("titanic.csv")

In [None]:
# Scanning using header and schema identification 2.


# Read the CSV file
csv_file_with_schema = spark.read.format("csv") \


.load("titanic.csv")

### JSON

In [None]:
# Let's read the JSON file. 1.
json_file = spark.read.format("").load("/home/student/titanic.json")
json_file.show()
json_file.printSchema()

In [None]:
# Let's read the JSON file. 2.
json_file = spark.read..load("/home/student/titanic.json")
json_file.show()
json_file.printSchema()

In [None]:
# Let's read the JSON file. 3.
json_file = spark.read.  ("/home/student/titanic.json")
json_file.show()
json_file.printSchema()

In [None]:
# Let's read the JSON file. 4.
json_file = spark.   ("/home/student/titanic.json")
json_file.show()
json_file.printSchema()

### Parquet

In [None]:
#Import Parquet file 

df_parquet = spark.read.format("").load("titanic_parquet")

df_parquet.show()
df_parquet.printSchema()

In [None]:
#Import Parquet file

df_parquet = spark.read..load("titanic_parquet")

df_parquet.show()
df_parquet.printSchema()

In [None]:
#Import Parquet file

df_parquet = spark..load("/home/student/titanic_parquet")

df_parquet.show()
df_parquet.printSchema()

In [None]:
#Import Parquet file

df_parquet = /home/student/titanic_parquet

df_parquet.show()
df_parquet.printSchema()

### Stop SparkContext

In [None]:
sc.stop()

## 3. Homework

Homework to look up and solve:
    
1. What other file formats does Spark support? What other other Spark features does Spark support? 
2. Kiiratáskor definiáljátok a különböző tömörítési eljárásokat.
3. what are the other methods of execution besides append and overwrite? Write an example task for this. 