In [0]:
# Writing Data to Files
import datetime
import pyspark.sql.functions as F
from pyspark.sql import Row
import pandas as pd
from pyspark.sql.types import *

In [0]:
%fs ls /public/retail_db

In [0]:
%fs ls /public/retail_db/orders

In [0]:
orders = spark.read.csv('/public/retail_db/orders', inferSchema=True).toDF('order_id', 'order_date', 'order_customer_id', 'order_status')

In [0]:
orders.inputFiles()

In [0]:
orders.dtypes

In [0]:
orders.show()

In [0]:
import getpass
username = getpass.getuser()

In [0]:
courses = [
    {
        'course_id': 1,
        'course_title': 'Python for Data Science',
        'course_published_dt': datetime.date(2020, 1, 1),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2020, 1, 1, 10, 10, 10)
    },
    {
        'course_id': 2,
        'course_title': 'Data Engineering with Apache Spark',
        'course_published_dt': datetime.date(2020, 2, 1),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2020, 2, 1, 10, 10, 10)
    },
    {
        'course_id': 3,
        'course_title': 'Data Science with Python',
        'course_published_dt': datetime.date(2020, 3, 1),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2020, 3, 1, 10, 10, 10)
    },
    {
        'course_id': 4,
        'course_title': 'Data Science with R',
        'course_published_dt': datetime.date(2020, 4, 1),
        'is_active': False,
        'last_updated_ts': datetime.datetime(2020, 4, 1, 10, 10, 10)
    }
]

courses_df = spark.createDataFrame([Row(**course) for course in courses])

In [0]:
help(courses_df.write)

In [0]:
courses_df.write.json(f'/user/{username}/courses', mode='overwrite')

In [0]:
dbutils.fs.ls(f'/user/{username}/courses')

In [0]:
courses_df.write.format('json').save(f'/user/{username}/courses_json', mode='overwrite')

In [0]:
courses_df.show()

In [0]:
courses_df.dtypes

In [0]:
courses_df.write.csv(f'/user/{username}/courses_csv', mode='overwrite')

In [0]:
dbutils.fs.ls(f'/user/{username}/courses_csv')

In [0]:
courses_df.write.format('csv').save('/user/{username}/courses_csv', mode='overwrite')

In [0]:
spark.read.text(f'/user/{username}/courses_csv').show(truncate=False)

In [0]:
courses_df.coalesce(1).write.csv(f'/user/{username}/courses_csv', mode='overwrite', header=True)

In [0]:
dbutils.fs.ls('/user/{username}/courses_csv')

In [0]:
spark.read.text(f'/user/{username}/courses_csv').show(truncate=False)

In [0]:
courses_df.coalesce(1).write.csv(f'/user/{username}/courses_csv_compressed', mode='overwrite', header=True, compression='gzip')

In [0]:
courses_df.coalesce(1).write.format('csv').save(f'/user/{username}/courses_csv_compressed', mode='overwrite', header=True, compression='snappy')

In [0]:
input_dir = '/public/retail_db'
output_dir = f'/user/{username}/retail_db_pipe'

In [0]:
for file_details in dbutils.fs.ls(input_dir):
    if not ('.git' in file_details.path or file_details.path.endswith('.sql')):
        print(f'Converting data in {file_details.path} folder from comma separated to pipe separated')
        df = spark.read.csv(file_details.path)
        folder_name = file_details.path.split('/')[-2]
        df.coalesce(1).write.mode('overwrite').csv(f'{output_dir}/{folder_name}', sep='|')

In [0]:
orders = spark.read.schema('order_id INT, order_date TIMESTAMP, order_customer_id INT, order_status STRING').csv(f'/user/{username}/retail_db_pipe/orders', sep='|')

In [0]:
orders.show()

In [0]:
help(orders.write.option)

In [0]:
help(orders.write.options)

In [0]:
orders.coalesce(1).write.mode('overwrite') \
    .option('compression', 'gzip') \
    .option('header', True) \
    .option('sep', '|') \
    .csv(f'{output_dir}/orders')

In [0]:
options = {
    'sep': '|',
    'header': True,
    'compression': 'snappy'
}

In [0]:
orders.coalesce(1).write.mode('overwrite') \
    .options(**options) \
    .csv(f'{output_dir}/orders')

In [0]:
courses_df.coalesce(1).write.json(f'/user/{username}/courses_json', mode='overwrite')

In [0]:
courses_df.coalesce(1).write.format('json').save(f'/user/{username}/courses_json', mode='overwrite')

In [0]:
courses_df.coalesce(1).write.json(f'/user/{username}/courses_json', mode='overwrite', compression='snappy')

In [0]:
courses_df.write.parquet(f'/user/{username}/courses_parquet', mode='overwrite')

In [0]:
courses_df.write.format('parquet').save(f'/user/{username}/courses_parquet', mode='overwrite')

In [0]:
courses_df.write.parquet(f'/user/{username}/courses_parquet', mode='overwrite', compression='snappy')

In [0]:
# Different Modes
help(courses_df.write.mode)

In [0]:
# courses_df.write.mode(saveMode).file_format(path_to_folder)
# courses_df.write.file_format(path_to_folder, mode=saveMode)
# courses_df.write.mode(saveMode).format('file_format').save(path_to_folder)
# courses_df.write.format('file_format').save(path_to_folder, mode=saveMode)

In [0]:
courses_df.write.parquet(f'/user/{username}/courses_parquet', mode='append', compression='snappy')

In [0]:
help(courses_df.coalesce)

In [0]:
help(courses_df.repartition)

In [0]:
courses_df.rdd.getNumPartitions()

In [0]:
df.coalesce(16).rdd.getNumPartitions()

In [0]:
df.repartition(16).rdd.getNumPartitions()

In [0]:
courses_df.repartition(186, 'Year', 'Month').rdd.getNumPartitions()