In [None]:
A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. 
We can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. 
Apache Spark DataFrames provide a rich set of functions (select columns, filter, join, aggregate) that allow you to solve common data analysis problems efficiently.

Apache Spark DataFrames are an abstraction built on top of Resilient Distributed Datasets (RDDs). 
Spark DataFrames and Spark SQL use a unified planning and optimization engine, allowing you to get nearly identical performance across all supported languages on Databricks (Python, SQL, Scala, and R).

In [None]:
from pyspark import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

spark = SparkSession.builder.master('local[4]').appName('demo').getOrCreate()

In [None]:
cols = ("book", "counts_sold")
rows = [("then there were none", "2005400"), ("in cold blood", "1500000"), ("the hobbit", "30000000")]
rdd = spark.sparkContext.parallelize(rows)

In [None]:
df_rdd = spark.createDataFrame(rdd)
df_rdd.collect()

Out[3]: [Row(_1='then there were none', _2='2005400'),
 Row(_1='in cold blood', _2='1500000'),
 Row(_1='the hobbit', _2='30000000')]

In [None]:
df_rdd2 = spark.createDataFrame(rdd).toDF('book', 'counts_sold')
df_rdd2.collect()

Out[4]: [Row(book='then there were none', counts_sold='2005400'),
 Row(book='in cold blood', counts_sold='1500000'),
 Row(book='the hobbit', counts_sold='30000000')]

In [None]:
values = [("thomas_jefferson", 42), ("alpa_cino", 23), ("baldur", 43), ("rollo", 54)]
spark.createDataFrame(values).collect()

Out[3]: [Row(_1='thomas_jefferson', _2=42),
 Row(_1='alpa_cino', _2=23),
 Row(_1='baldur', _2=43),
 Row(_1='rollo', _2=54)]

In [None]:
values = [{'name':'skarsgard', 'age': 23, 'nationality':'norway'}]
df = spark.createDataFrame(values)
display(df)

age,name,nationality
23,skarsgard,norway


In [None]:
from pyspark.context import SparkContext
sc = spark.sparkContext

column = ["city", "code", "population"]
rows = [("Sweden", 2101, 120321), ("finland", 2102, 120331) , ("norway", 2013, 123123)]

rdd = sc.parallelize(rows)
df_from_rdd = spark.createDataFrame(rdd)
display(df_from_rdd)

_1,_2,_3
Sweden,2101,120321
finland,2102,120331
norway,2013,123123


In [None]:
from pyspark.sql.types import *
schema = StructType([
    StructField("city", StringType(), True),
    StructField("code", IntegerType(), True),
    StructField("population", IntegerType(), True)]
)

df1 = spark.createDataFrame(rdd, schema)

In [None]:
display(df1)

city,code,population
Sweden,2101,120321
finland,2102,120331
norway,2013,123123


In [None]:
path = "/FileStore/tables/Datas/Student_Details.csv"
df_csv = spark.read.option("delimiter", ",").option("header", True).csv(path)

In [None]:
display(df_csv)

gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
female,group B,bachelor's degree,standard,none,72,72,74
female,group C,some college,standard,completed,69,90,88
female,group B,master's degree,standard,none,90,95,93
male,group A,associate's degree,free/reduced,none,47,57,44
male,group C,some college,standard,none,76,78,75
female,group B,associate's degree,standard,none,71,83,78
female,group B,some college,standard,completed,88,95,92
male,group B,some college,free/reduced,none,40,43,39
male,group D,high school,free/reduced,completed,64,64,67
female,group B,high school,free/reduced,none,38,60,50


In [None]:
type(df_csv)

Out[11]: pyspark.sql.dataframe.DataFrame

In [None]:
path = "/FileStore/tables/Datas/ghtorrent_logs.txt"
df_text = spark.read.option('header', 'true').option('lineSep', ",").text(path)

In [None]:
display(df_text)

In [None]:
path = "/FileStore/tables/Datas/EmployeeData.json"
df_json = spark.read.option("multiline", True).schema(final_schema).json(path)

display(df_json)

data,source
"List(List(01000US, United States, 2020, 2020, 326569308, united-states), List(01000US, United States, 2019, 2019, 324697795, united-states), List(01000US, United States, 2018, 2018, 322903030, united-states), List(01000US, United States, 2017, 2017, 321004407, united-states), List(01000US, United States, 2016, 2016, 318558162, united-states), List(01000US, United States, 2015, 2015, 316515021, united-states), List(01000US, United States, 2014, 2014, 314107084, united-states), List(01000US, United States, 2013, 2013, 311536594, united-states))","List(List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List()))"


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, ArrayType
data_schema = StructType([
    StructField("ID Nation", StringType(), True),
    StructField("Nation", StringType(), True),
    StructField("ID Year", LongType() , True),
    StructField("Year", StringType(), True),
    StructField("Population", LongType(), True),
    StructField("Slug Nation", StringType(), True)
])
annotation_schema = StructType([
    StructField("source_names", StringType(), True),
    StructField("source_description", StringType(), True),
    StructField("dataset_name", StringType(), True),
    StructField("dataset_link", StringType(), True),
    StructField("table_id", StringType(), True),
    StructField("topic", StringType(), True),
    StructField ("subtopic", StringType(), True)
])
source_schema = StructType([
    StructField("annotations", annotation_schema, True ),
    StructField("measure", ArrayType(StringType()), False ),
    StructField("name", StringType(), True),
    StructField("substitutions", ArrayType(StringType()), True)
])
final_schema = StructType([
    StructField("data", ArrayType(data_schema), True),
    StructField("source", ArrayType(source_schema), True)
])

In [None]:
from pyspark.sql.functions import explode, col
Explode_data = df_json.withColumn('data_explode', explode('data')).withColumn('source_explode', explode('source'))\
            .drop('data').drop('source')
display(Explode_data)

data_explode,source_explode
"List(01000US, United States, 2020, 2020, 326569308, united-states)","List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List())"
"List(01000US, United States, 2019, 2019, 324697795, united-states)","List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List())"
"List(01000US, United States, 2018, 2018, 322903030, united-states)","List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List())"
"List(01000US, United States, 2017, 2017, 321004407, united-states)","List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List())"
"List(01000US, United States, 2016, 2016, 318558162, united-states)","List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List())"
"List(01000US, United States, 2015, 2015, 316515021, united-states)","List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List())"
"List(01000US, United States, 2014, 2014, 314107084, united-states)","List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List())"
"List(01000US, United States, 2013, 2013, 311536594, united-states)","List(List(null, The American Community Survey (ACS) is conducted by the US Census and sent to a portion of the population every year., ACS 5-year Estimate, http://www.census.gov/programs-surveys/acs/, B01003, Diversity, Demographics), null, acs_yg_total_population_5, List())"


In [None]:
data_explode = df_json.withColumn('data_explode', explode('data')).withColumn('ID Nation', col('data_explode.ID Nation'))\
            .withColumn('ID Year', col('data_explode.ID Year')).withColumn('Nation', col('data_explode.Nation'))\
            .withColumn('Population', col('data_explode.Population')).withColumn('Slug Nation', col('data_explode.Slug Nation'))\
            .withColumn('Year', col('data_explode.Year'))\
            .drop('data').drop('data_explode').drop('source').drop('source_explode')
display(data_explode)

ID Nation,ID Year,Nation,Population,Slug Nation,Year
01000US,2020,United States,326569308,united-states,2020
01000US,2019,United States,324697795,united-states,2019
01000US,2018,United States,322903030,united-states,2018
01000US,2017,United States,321004407,united-states,2017
01000US,2016,United States,318558162,united-states,2016
01000US,2015,United States,316515021,united-states,2015
01000US,2014,United States,314107084,united-states,2014
01000US,2013,United States,311536594,united-states,2013


In [None]:
Read_file = spark.read.format("csv").option("header", True).option("Delimiter", ","). \
    load("/FileStore/tables/Datas/Student_Details.csv")
display(Read_file)

gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
female,group B,bachelor's degree,standard,none,72,72,74
female,group C,some college,standard,completed,69,90,88
female,group B,master's degree,standard,none,90,95,93
male,group A,associate's degree,free/reduced,none,47,57,44
male,group C,some college,standard,none,76,78,75
female,group B,associate's degree,standard,none,71,83,78
female,group B,some college,standard,completed,88,95,92
male,group B,some college,free/reduced,none,40,43,39
male,group D,high school,free/reduced,completed,64,64,67
female,group B,high school,free/reduced,none,38,60,50


In [None]:
Write_json = Read_file.write.mode('overwrite').option("multiline", True).json("Path/json_data")

In [None]:
Write_Parquet = Read_file.write.mode('overwrite').parquet("Path/parquet_data")

In [None]:
Save_As_table= Read_file.write.format("json").mode("overwrite").saveAsTable("Json_data1")

In [None]:
Raw_data =[('Washing Machine',1648770933000, 20000,'Samsung', 'India','0001'),
               ('Refrigerator',1648770999000,35000,' LG','null','0002'),
               ('Air Cooler',1648770948000,45000,' Voltas','null','0003')]

user_schema = ["Product_Name","Issue_Date","Price","Brand","Country","Product_number"]
ProductDF= spark.createDataFrame(data=Raw_data,schema=user_schema)

ProductDF.printSchema()
display(ProductDF)

root
 |-- Product_Name: string (nullable = true)
 |-- Issue_Date: long (nullable = true)
 |-- Price: long (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Product_number: string (nullable = true)



Product_Name,Issue_Date,Price,Brand,Country,Product_number
Washing Machine,1648770933000,20000,Samsung,India,1
Refrigerator,1648770999000,35000,LG,,2
Air Cooler,1648770948000,45000,Voltas,,3


In [None]:
from pyspark.sql.functions import *
df=ProductDF.withColumn('Issue_Date_timestamp', from_unixtime(substring(col('Issue_Date'), 1, 10),"yyyy-MM-dd'T'HH:mm:ss"))
df.printSchema()
display(df)

root
 |-- Product_Name: string (nullable = true)
 |-- Issue_Date: long (nullable = true)
 |-- Price: long (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Product_number: string (nullable = true)
 |-- Issue_Date_timestamp: string (nullable = true)



Product_Name,Issue_Date,Price,Brand,Country,Product_number,Issue_Date_timestamp
Washing Machine,1648770933000,20000,Samsung,India,1,2022-03-31T23:55:33
Refrigerator,1648770999000,35000,LG,,2,2022-03-31T23:56:39
Air Cooler,1648770948000,45000,Voltas,,3,2022-03-31T23:55:48
