#Create Spark Session

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Name").getOrCreate()
df = spark.read.csv("dbfs:/FileStore/Car_sales1.csv")
df.show()
# By default in Azure Databricks and Azure synapse spark object is present

+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|  _c0|         _c1|        _c2|               _c3|                _c4|         _c5|               _c6|        _c7|       _c8|      _c9| _c10|  _c11|       _c12|         _c13|           _c14|         _c15|             _c16|
+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|CarID|Manufacturer|      Model|Sales_in_thousands|__year_resale_value|Vehicle_type|Price_in_thousands|Engine_size|Horsepower|Wheelbase|Width|Length|Curb_weight|Fuel_capacity|Fuel_efficiency|Latest_Launch|Power_perf_factor|
|    1|       Acura|    Integra|            16.919|              16.36|   Passenger|              21.5| 

#Create DataFrame

In [0]:
#Method- 1(By default schema generate there datatype)
data = [(1,"Pranci",26),(2,"Tanna",24)]
df = spark.createDataFrame(data = data, schema = ['id','name','age'])
df.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|Pranci| 26|
|  2| Tanna| 24|
+---+------+---+



In [0]:
type(df) #to get the type of data

Out[3]: pyspark.sql.dataframe.DataFrame

In [0]:
dir(df) # to get all the list of properties, methods and Functions

Out[4]: ['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collect_as_arrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_joinAsOf',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_session',
 '_sort_cols',
 '_sql_ctx',
 '_support_repr_html',
 '_to_corrected_pandas_type',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'display',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'd

In [0]:
# Method- 2( we mention the datatype of schema)
from pyspark.sql.types import StructField,StructType, IntegerType,StringType
data = [(1,"Pranci",2000),(2,"Tanna",5000)]
sche = StructType([StructField(name = "id",dataType= IntegerType()),
                   StructField(name = "Name",dataType = StringType()),
                   StructField(name = "Salary",dataType = IntegerType())])
df = spark.createDataFrame(data = data, schema = sche)
df.show()

+---+------+------+
| id|  Name|Salary|
+---+------+------+
|  1|Pranci|  2000|
|  2| Tanna|  5000|
+---+------+------+



#Read CSV File

In [0]:
#Method 1 - Read single file
df = spark.read.csv("dbfs:/FileStore/Car_sales1.csv",inferSchema = True)
df.show()

+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|  _c0|         _c1|        _c2|               _c3|                _c4|         _c5|               _c6|        _c7|       _c8|      _c9| _c10|  _c11|       _c12|         _c13|           _c14|         _c15|             _c16|
+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|CarID|Manufacturer|      Model|Sales_in_thousands|__year_resale_value|Vehicle_type|Price_in_thousands|Engine_size|Horsepower|Wheelbase|Width|Length|Curb_weight|Fuel_capacity|Fuel_efficiency|Latest_Launch|Power_perf_factor|
|    1|       Acura|    Integra|            16.919|              16.36|   Passenger|              21.5| 

In [0]:
#Method 2 - Read single file
df = spark.read.format("csv").option(key = "header",value = True).load("dbfs:/FileStore/Car_sales1.csv",haeder = True)
df.show()

+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|CarID|Manufacturer|      Model|Sales_in_thousands|__year_resale_value|Vehicle_type|Price_in_thousands|Engine_size|Horsepower|Wheelbase|Width|Length|Curb_weight|Fuel_capacity|Fuel_efficiency|Latest_Launch|Power_perf_factor|
+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|    1|       Acura|    Integra|            16.919|              16.36|   Passenger|              21.5|        1.8|       140|    101.2| 67.3| 172.4|      2.639|         13.2|             28|     2/2/2012|      58.28014952|
|    2|       Acura|         TL|            39.384|             19.875|   Passenger|              28.4| 

In [0]:
#Method 3 - Read multiple file
df = spark.read.format("csv").option(key = "header",value = True).load(path = ["dbfs:/FileStore/Car_sales1.csv","dbfs:/FileStore/Car_sales2.csv"],haeder = True)
df.show()

+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|CarID|Manufacturer|      Model|Sales_in_thousands|__year_resale_value|Vehicle_type|Price_in_thousands|Engine_size|Horsepower|Wheelbase|Width|Length|Curb_weight|Fuel_capacity|Fuel_efficiency|Latest_Launch|Power_perf_factor|
+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|    1|       Acura|    Integra|            16.919|              16.36|   Passenger|              21.5|        1.8|       140|    101.2| 67.3| 172.4|      2.639|         13.2|             28|     2/2/2012|      58.28014952|
|    2|       Acura|         TL|            39.384|             19.875|   Passenger|              28.4| 

In [0]:
#Method 3 - Read all file having same path
df = spark.read.format("csv").option(key = "header",value = True).load(path = ["dbfs:/FileStore/Data"],haeder = True)
df.show()

+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|CarID|Manufacturer|      Model|Sales_in_thousands|__year_resale_value|Vehicle_type|Price_in_thousands|Engine_size|Horsepower|Wheelbase|Width|Length|Curb_weight|Fuel_capacity|Fuel_efficiency|Latest_Launch|Power_perf_factor|
+-----+------------+-----------+------------------+-------------------+------------+------------------+-----------+----------+---------+-----+------+-----------+-------------+---------------+-------------+-----------------+
|    1|       Acura|    Integra|            16.919|              16.36|   Passenger|              21.5|        1.8|       140|    101.2| 67.3| 172.4|      2.639|         13.2|             28|     2/2/2012|      58.28014952|
|    2|       Acura|         TL|            39.384|             19.875|   Passenger|              28.4| 

#Write CSV File

In [0]:
data = [(1,"Pranci",26),(2,"Tanna",24)]
df = spark.createDataFrame(data = data, schema = ['id','name','age'])
df.show()
df.write.csv(path= "dbfs:/FileStore/Data/Dummy",header = True,mode = "append")
# Modes:- append() - To add the data to the existing file
# overwrite() - To overwrite the data to the existing file
# ignore() - To Ignore write operation when the file already exist
# error() - default optio, when the file already exist, it return error

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|Pranci| 26|
|  2| Tanna| 24|
+---+------+---+



#Read JSON File

In [0]:
# Method 1
df1 = spark.read.json("dbfs:/FileStore/Data.json")
df1.show()

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|   PR|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|PASEO COSTA DEL SUR

In [0]:
# Method 2
df1 = spark.read.format("org.apache.spark.sql.json").load("dbfs:/FileStore/Data.json")
df1.show()

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|   PR|           null|      null|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|PASEO COSTA DEL SUR

In [0]:
# Method 3 - For multiline files
df = spark.read.option("multiline","True").json("dbfs:/FileStore/Data.json")
df.show()

+-----------+-------+-------------+-----+--------------------+---------------+--------------+------+------------+-----+-----------+-----+-----+-----+-----------+-------+
|       City|Country|Decommisioned|  Lat|            Location|   LocationText|  LocationType|  Long|RecordNumber|State|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-----------+-------+-------------+-----+--------------------+---------------+--------------+------+------------+-----+-----------+-----+-----+-----+-----------+-------+
|PARC PARQUE|     US|        false|17.96|NA-US-PR-PARC PARQUE|Parc Parque, PR|NOT ACCEPTABLE|-66.22|           1|   PR|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
+-----------+-------+-------------+-----+--------------------+---------------+--------------+------+------------+-----+-----------+-----+-----+-----+-----------+-------+



#Write JSON data into Dataframe

In [0]:
data = [(1,"Pranci"),(2,"Tanna")]
schema = ["id","name"]
df = spark.createDataFrame(data = data, schema = schema)
df.write.json("dbfs:/FileStore/SampleData.json")