## CSV

In [0]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, ArrayType

In [0]:
def read_csv(file_path, option_header=False, option_infer_schema=False, option_mode='PERMISSIVE', schema=None):
  if schema:
    df = spark.read \
              .option('header', option_header) \
              .option('inferSchema', option_infer_schema) \
              .option('mode', option_mode) \
              .schema(schema) \
              .csv(file_path)
  else:
    df = spark.read \
              .option('header', option_header) \
              .option('inferSchema', option_infer_schema) \
              .option('mode', option_mode) \
              .csv(file_path)
  return df

df = read_csv('/FileStore/tables/data.csv', option_header=True, option_infer_schema=True)
df.show()
df.write.csv('FileStore/tables/csv_1', mode='append')

In [0]:
schema = StructType([
  StructField('Date', StringType(), True),
  StructField('Open', DoubleType(), True),
  StructField('High', DoubleType(), True),
  StructField('Low', DoubleType(), True),
  StructField('Close', DoubleType(), True),
  StructField('SharesTraded', DoubleType(), True),
  StructField('Turnover', DoubleType(), True),
  StructField('_corrupt_record', StringType(), True),
])

df = read_csv('/FileStore/tables/data.csv', option_header=True, schema = schema)
df.show()
df.write.partitionBy('Date').csv('/FileStore/tables/csv_2', mode='overwrite')

In [0]:
error_schema = StructType([
  StructField('Date', StringType(), True),
  StructField('Open', DoubleType(), True),
  StructField('High', DoubleType(), True),
  StructField('Low', DoubleType(), True),
  StructField('Close', DoubleType(), True),
  StructField('SharesTraded', DoubleType(), True),
  StructField('Turnover', IntegerType(), True),
  StructField('_corrupt_record', StringType(), True),
])

df = read_csv('/FileStore/tables/data.csv', option_header=True, schema = error_schema)
df.show()

In [0]:
df = read_csv('/FileStore/tables/data.csv', option_header=True, schema = error_schema, option_mode='FAILFAST')
df.show()

In [0]:
df = read_csv('/FileStore/tables/data.csv', option_header=True, schema = error_schema, option_mode='DROPMALFORMED')
df.show()

## JSON

In [0]:
df = spark.read.option('multiline', True).json('/FileStore/tables/sample-1.json')

In [0]:
df.show(truncate=False)

In [0]:
schema = StructType([
  StructField('id', StringType()),
  StructField('name', StringType()),
  StructField('ppu', DoubleType()),
  StructField('batters', StructType([
    StructField('batter', ArrayType(
      StructType([
        StructField('id', StringType()),
        StructField('type', StringType())
      ])
    ))
  ])),
  StructField('topping', ArrayType(
    StructType([
      StructField('id', StringType()),
      StructField('type', StringType())
    ])
  ))
])

In [0]:
df = spark.read.option('multiline', True).schema(schema).json('/FileStore/tables/sample-1.json')
df.write.partitionBy('name').bucketBy(4, 'ppu').json('/FileStore/tables/json_1')

In [0]:
df.show(truncate=False)

## PARQUET

In [0]:
df = spark.read.option('mergeSchema', True).parquet('/FileStore/tables/par2')
df.reparition(4).write.parquet('/FileStore/tables/parquet_1', mode='ignore')

In [0]:
df.show()

## JDBC

In [0]:
jdbcUrl = "jdbc:postgresql:dbserver"
dbTable = "employees"
connectionProperties = {"user": "username", "password": "password"}
df = spark.read.jdbc(jdbcUrl, dbTable, connectionProperties)