In [2]:
from pyspark.sql import SparkSession

In [3]:
session = SparkSession.builder.master('spark://spark-master:7077').appName('load_save').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/03 12:02:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [12]:
people = session.read.load('/opt/bitnami/spark/data/people.json', format='json')
people.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Default parquet write

In [5]:
people.write.save('/opt/bitnami/spark/data/temp/people_o.parquet')

                                                                                

#### Write files with format

In [15]:
people.write.save('/opt/bitnami/spark/data/temp/people.parquet', format="parquet")

In [33]:
people_temp = session.read.load('/opt/bitnami/spark/data/temp/people.parquet', format='parquet')
people_temp.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Write files with extra options

In [20]:
(people.write.format('parquet')
 .option('parquet.bloom.filter.enabled#name', 'true')
 .option('parquet.bloom.filter.expected.ndv#name', '1000000')
 .option('parquet.enable.dictinary', 'true')
 .option('parquet.page.write-checksum.enabled', 'false').
 save('/opt/bitnami/spark/data/temp/people_withoptions.parquet'))

                                                                                

### Bucketing, Sorting and Partitioning

For file-based data source, it is also possible to bucket and sort or partition the output. Bucketing and sorting are applicable only to persistent tables:

In [8]:
df = session.read.parquet("/opt/bitnami/spark/data/users.parquet")

In [14]:
# bucketing
people.write.bucketBy(42, 'name').sortBy('age').saveAsTable('people_bucketed1')

                                                                                

In [15]:
df.write.partitionBy('favorite_color').format('parquet').save('name_color_partition.parquet')

                                                                                

In [16]:
(df.write
.partitionBy('favorite_color')
.bucketBy(42, 'name')
.saveAsTable('users_partitioned_bucked'))

                                                                                

### Save Modes


    Append: add additional data to an existing table
    Overwrite: remove the existing data from the table and replace it with new data
    Error (aka errorifexists): throw an error if the table exists and contains data
    Ignore: don’t write if the table already exists, but don’t throw an error either


In [26]:
people.write.mode('append').format('parquet').save('/opt/bitnami/spark/data/temp/people.parquet')

In [28]:
people.write.mode('overwrite').format('parquet').save('/opt/bitnami/spark/data/temp/people.parquet')

In [32]:
people.write.mode('ignore').format('parquet').save('/opt/bitnami/spark/data/temp/people.parquet')

In [31]:
## default
people.write.mode('errorifexists').format('parquet').save('/opt/bitnami/spark/data/temp/people.parquet')

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/opt/bitnami/spark/data/temp/people.parquet already exists. Set mode as "overwrite" to overwrite the existing path.