# Basic of pyspark

## Initialize and import library from spark

In [1]:
!python --version

Python 3.10.12


In [2]:
import findspark
findspark.init()

In [3]:
findspark.__version__

'2.0.1'

### What is findspark?

findspark is a Python library that helps you locate and use Apache Spark from within a Python script or a Jupyter Notebook environment, especially when Spark is not installed in the system environment variables.


findspark is a Python library that helps you locate and use Apache Spark from within a Python script or a Jupyter Notebook environment, especially when Spark is not installed in the system environment variables.

When you install Spark, you typically need to set certain environment variables such as SPARK_HOME and update the PYTHONPATH to include the Spark Python libraries. findspark automates this process by searching for the Spark installation directory and adding it to the PYTHONPATH, making it easier to work with Spark in Python scripts and notebooks.

Here's a typical use case for findspark:

By using findspark, you can seamlessly integrate Spark into your Python environment without manually configuring environment variables. This is particularly useful in development or testing environments where you want to quickly set up and work with Spark without dealing with environment configuration.

In [4]:
import pyspark
pyspark.__version__

'3.5.0'

### What is pyspark and why we use pyspark


PySpark is the Python API for Apache Spark, a powerful open-source distributed computing system. Here are some details about PySpark:

**Apache Spark**: Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in several programming languages, including Scala, Java, Python, and R, for building parallel applications. Spark is designed for distributed processing of large-scale data sets across clusters of computers.

**Python API (PySpark)**: PySpark is the Python API for Apache Spark. It allows Python developers to leverage the power of Spark for data processing and analysis using familiar Python programming constructs. PySpark provides an easy-to-use interface for interacting with Spark's distributed computing engine, enabling scalable and efficient data processing.

**Features:**

* Distributed Computing: PySpark enables distributed processing of data across a cluster of machines, allowing you to scale your data processing tasks horizontally.
* In-Memory Computation: Spark uses in-memory processing to speed up data processing tasks, making it suitable for iterative algorithms and interactive data analysis.
* Rich Ecosystem: Spark comes with a rich set of libraries and tools for various data processing tasks, including SQL queries (Spark SQL), machine learning (MLlib), graph processing (GraphX), and streaming analytics (Spark Streaming).
* Ease of Use: PySpark provides a user-friendly API that abstracts away the complexities of distributed computing, making it easier for Python developers to work with large-scale data sets.
* Integration: PySpark integrates seamlessly with other Python libraries and frameworks such as Pandas, NumPy, Matplotlib, and Scikit-learn, allowing you to leverage the strengths of these libraries in conjunction with Spark.
* Components: PySpark includes several components that work together to enable distributed data processing:

**Spark Core**: The foundational component of Spark that provides distributed task scheduling, fault tolerance, and memory management.
* Spark SQL: A module for working with structured data using SQL queries, DataFrames, and Datasets.
* MLlib: Spark's machine learning library that provides scalable implementations of machine learning algorithms.
* GraphX: A graph processing library for analyzing and processing graph data.
* Spark Streaming: A real-time streaming processing library for processing continuous streams of data.
**Use Cases**: PySpark is used in a wide range of industries and applications, including big data analytics, data warehousing, machine learning, real-time analytics, and more. It's particularly well-suited for processing large volumes of data efficiently and performing complex analytics tasks at scale.

Overall, PySpark is a versatile and powerful tool for building scalable and efficient data processing applications using Python, leveraging the distributed computing capabilities of Apache Spark.







### What is Hive? what is benifite use of Hive.

Hive is an open-source data warehouse infrastructure built on top of Apache Hadoop for providing data summarization, query, and analysis. Here are some key details about Hive:

**SQL-like Query Language**: Hive provides a SQL-like query language called HiveQL (HQL) that allows users to write queries to analyze and process large datasets stored in Hadoop Distributed File System (HDFS) or other compatible file systems. HiveQL syntax is similar to SQL, making it familiar and accessible to users who are already familiar with SQL.

**Data Warehousing**: Hive is designed for data warehousing and analytical processing of large datasets. It enables users to run ad-hoc queries, generate reports, and perform data analysis on massive volumes of structured and semi-structured data stored in Hadoop.

**Schema-on-Read:** Unlike traditional relational databases where data schema is defined upfront, Hive follows a schema-on-read approach. This means that data stored in Hadoop is stored in a raw format, and the schema is applied at the time of querying the data. This provides flexibility in handling diverse and evolving data schemas.

**Metastore:** Hive includes a metastore, which is a centralized repository that stores metadata information such as table schemas, partition information, column statistics, and storage location of data files. The metastore helps manage the metadata associated with Hive tables and partitions, making it easier to manage and query large datasets.

**Integration with Hadoop Ecosystem:** Hive seamlessly integrates with other components of the Hadoop ecosystem, including Hadoop Distributed File System (HDFS), Hadoop MapReduce, Apache Tez, Apache Spark, and others. This integration allows users to leverage the scalability and fault tolerance of Hadoop for processing and analyzing large datasets.

**Extensible Architecture:** Hive's architecture is extensible, allowing users to plug in custom extensions and UDFs (User-Defined Functions) to extend its functionality. Users can write custom functions in programming languages such as Java, Python, or Scala and integrate them into Hive queries.

**Use Cases:** Hive is widely used in various industries and domains for data warehousing, business intelligence, data analysis, and reporting. It is particularly well-suited for batch processing and analytical workloads on large volumes of structured and semi-structured data.

Overall, Hive provides a powerful and scalable solution for data warehousing and analytical processing of big data on Hadoop, enabling organizations to derive insights from large datasets stored in distributed environments.

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.sql.types import TimestampType

In [6]:
spark = SparkSession.builder.master("local[1]").appName("MyApp").getOrCreate()

24/03/19 23:08:50 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.106 instead (on interface wlp1s0)
24/03/19 23:08:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/19 23:08:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Initalize dummy data and create spark dataframe

In [7]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",None), \
      (6,"Brown",2,"2010","50","",None) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

In [8]:
emp_df = spark.createDataFrame(data=emp, schema=empColumns)
emp_df.show(truncate=False)

                                                                                

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |NULL  |
|6     |Brown   |2              |2010       |50         |      |NULL  |
+------+--------+---------------+-----------+-----------+------+------+



## Find null values

**Find null value into salary column**

In [9]:
emp_df.select(F.sum(F.col("salary").isNull().cast('int')).alias("null_count")).show()

+----------+
|null_count|
+----------+
|         2|
+----------+



In [10]:
emp_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in emp_df.columns]).show()

+------+----+---------------+-----------+-----------+------+------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+----+---------------+-----------+-----------+------+------+
|     0|   0|              0|          0|          0|     0|     2|
+------+----+---------------+-----------+-----------+------+------+



**Cosider an empty value as null value**

In [11]:
emp_df.select([F.sum(F.when((F.col(c)=="" )| (F.col(c).isNull()), 1).otherwise(0)).alias(c) for c in emp_df.columns]).show()

+------+----+---------------+-----------+-----------+------+------+
|emp_id|name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+----+---------------+-----------+-----------+------+------+
|     0|   0|              0|          0|          0|     2|     2|
+------+----+---------------+-----------+-----------+------+------+



In [12]:
emp_df.select([F.sum(F.when((F.col(c)=="" )| \
                            (F.col(c).isNull()), 1).otherwise(0)).alias(f"{c}_isNull") \
               
               for c in emp_df.columns]).show(truncate=False)

+-------------+-----------+----------------------+------------------+------------------+-------------+-------------+
|emp_id_isNull|name_isNull|superior_emp_id_isNull|year_joined_isNull|emp_dept_id_isNull|gender_isNull|salary_isNull|
+-------------+-----------+----------------------+------------------+------------------+-------------+-------------+
|0            |0          |0                     |0                 |0                 |2            |2            |
+-------------+-----------+----------------------+------------------+------------------+-------------+-------------+



## Rename column name using 
withColumn, selectExpr, withColumnRenamed and alias

In [13]:
df = emp_df.withColumn("YOJ", F.col("year_joined"))
df.show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+----+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|YOJ |
+------+--------+---------------+-----------+-----------+------+------+----+
|1     |Smith   |-1             |2018       |10         |M     |3000  |2018|
|2     |Rose    |1              |2010       |20         |M     |4000  |2010|
|3     |Williams|1              |2010       |10         |M     |1000  |2010|
|4     |Jones   |2              |2005       |10         |F     |2000  |2005|
|5     |Brown   |2              |2010       |40         |      |NULL  |2010|
|6     |Brown   |2              |2010       |50         |      |NULL  |2010|
+------+--------+---------------+-----------+-----------+------+------+----+



In [14]:
df = emp_df.selectExpr("emp_id", "year_joined as YOJ", "name as Name")
df.show(truncate=False)

+------+----+--------+
|emp_id|YOJ |Name    |
+------+----+--------+
|1     |2018|Smith   |
|2     |2010|Rose    |
|3     |2010|Williams|
|4     |2005|Jones   |
|5     |2010|Brown   |
|6     |2010|Brown   |
+------+----+--------+



In [15]:
df.select("emp_id", "YOJ", "Name").show(truncate=False)

+------+----+--------+
|emp_id|YOJ |Name    |
+------+----+--------+
|1     |2018|Smith   |
|2     |2010|Rose    |
|3     |2010|Williams|
|4     |2005|Jones   |
|5     |2010|Brown   |
|6     |2010|Brown   |
+------+----+--------+



In [16]:
df = emp_df.withColumnRenamed("year_joined", "YOJ")
df.show(truncate=False)

+------+--------+---------------+----+-----------+------+------+
|emp_id|name    |superior_emp_id|YOJ |emp_dept_id|gender|salary|
+------+--------+---------------+----+-----------+------+------+
|1     |Smith   |-1             |2018|10         |M     |3000  |
|2     |Rose    |1              |2010|20         |M     |4000  |
|3     |Williams|1              |2010|10         |M     |1000  |
|4     |Jones   |2              |2005|10         |F     |2000  |
|5     |Brown   |2              |2010|40         |      |NULL  |
|6     |Brown   |2              |2010|50         |      |NULL  |
+------+--------+---------------+----+-----------+------+------+



In [17]:
df = emp_df.withColumnRenamed("year_joied", "yoj")\
            .withColumnRenamed("name", "Name")
df.show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    Name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |  NULL|
|     6|   Brown|              2|       2010|         50|      |  NULL|
+------+--------+---------------+-----------+-----------+------+------+



In [18]:
df = emp_df.select("emp_id", "Name", \
                   F.col("superior_emp_id").alias("sup_emp_id"), \
                   F.col("year_joined").alias("YOJ"), \
                   "emp_dept_id", \
                   F.col("gender").alias("GEN"), \
                   "salary")

df.show(truncate=False)

+------+--------+----------+----+-----------+---+------+
|emp_id|Name    |sup_emp_id|YOJ |emp_dept_id|GEN|salary|
+------+--------+----------+----+-----------+---+------+
|1     |Smith   |-1        |2018|10         |M  |3000  |
|2     |Rose    |1         |2010|20         |M  |4000  |
|3     |Williams|1         |2010|10         |M  |1000  |
|4     |Jones   |2         |2005|10         |F  |2000  |
|5     |Brown   |2         |2010|40         |   |NULL  |
|6     |Brown   |2         |2010|50         |   |NULL  |
+------+--------+----------+----+-----------+---+------+



In [19]:
emp_df.printSchema()

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



## Use of filter and where function

In [20]:
df = emp_df.select("emp_id", "name", "salary").filter(F.col("salary")<=3000)
df.show()

+------+--------+------+
|emp_id|    name|salary|
+------+--------+------+
|     1|   Smith|  3000|
|     3|Williams|  1000|
|     4|   Jones|  2000|
+------+--------+------+



In [21]:
df = emp_df.filter((F.col("emp_dept_id")>=20) & (F.col("emp_dept_id") <50))
df.show()

+------+-----+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|     2| Rose|              1|       2010|         20|     M|  4000|
|     5|Brown|              2|       2010|         40|      |  NULL|
+------+-----+---------------+-----------+-----------+------+------+



In [22]:
emp_df.select("*").filter((F.col("salary") >=2000) & (F.col("salary") < 5000)).show()

+------+-----+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|     1|Smith|             -1|       2018|         10|     M|  3000|
|     2| Rose|              1|       2010|         20|     M|  4000|
|     4|Jones|              2|       2005|         10|     F|  2000|
+------+-----+---------------+-----------+-----------+------+------+



In [23]:
df = emp_df.select("emp_id", "name", "salary").where("salary <=3000")
df.show(truncate=False)

+------+--------+------+
|emp_id|name    |salary|
+------+--------+------+
|1     |Smith   |3000  |
|3     |Williams|1000  |
|4     |Jones   |2000  |
+------+--------+------+



In [24]:
df = emp_df.where("salary >= 2000 and salary < 5000")
df.show()

+------+-----+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|     1|Smith|             -1|       2018|         10|     M|  3000|
|     2| Rose|              1|       2010|         20|     M|  4000|
|     4|Jones|              2|       2005|         10|     F|  2000|
+------+-----+---------------+-----------+-----------+------+------+



In [25]:
emp_df.printSchema()

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



## Use of aggregate function

In [26]:
df_gpby = emp_df.groupBy("emp_dept_id").agg(F.sum("salary").alias("dept_wise_sal")).show()

+-----------+-------------+
|emp_dept_id|dept_wise_sal|
+-----------+-------------+
|         40|         NULL|
|         20|         4000|
|         10|         6000|
|         50|         NULL|
+-----------+-------------+



In [27]:
emp_df.groupBy("emp_dept_id", "superior_emp_id").agg(F.sum("salary").alias("aggregate_sum")).show()

+-----------+---------------+-------------+
|emp_dept_id|superior_emp_id|aggregate_sum|
+-----------+---------------+-------------+
|         40|              2|         NULL|
|         10|              2|         2000|
|         20|              1|         4000|
|         10|              1|         1000|
|         10|             -1|         3000|
|         50|              2|         NULL|
+-----------+---------------+-------------+



In [28]:
emp_df.printSchema()

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



**cast the data type**

In [29]:
df = emp_df.select(F.col("emp_id"), \
                   F.col("name"), \
                   F.col("superior_emp_id"), \
                   F.col("emp_dept_id"), \
                   F.col("gender"), \
                   F.col("salary").cast("int").alias("salary"))
df.show()

+------+--------+---------------+-----------+------+------+
|emp_id|    name|superior_emp_id|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+------+------+
|     1|   Smith|             -1|         10|     M|  3000|
|     2|    Rose|              1|         20|     M|  4000|
|     3|Williams|              1|         10|     M|  1000|
|     4|   Jones|              2|         10|     F|  2000|
|     5|   Brown|              2|         40|      |  NULL|
|     6|   Brown|              2|         50|      |  NULL|
+------+--------+---------------+-----------+------+------+



In [30]:
df.printSchema()

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [31]:
df = emp_df.withColumn("salary", F.when(F.col("salary").cast("int").isNull(), 0).otherwise(F.col("salary").cast('int')))
df.show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |     0|
|     6|   Brown|              2|       2010|         50|      |     0|
+------+--------+---------------+-----------+-----------+------+------+



In [32]:
df.groupBy("emp_dept_id").agg(F.sum("salary").alias("total_sal")).show()

+-----------+---------+
|emp_dept_id|total_sal|
+-----------+---------+
|         40|        0|
|         20|     4000|
|         10|     6000|
|         50|        0|
+-----------+---------+



## Use of when function
`when()` can use for `if else` condition 

In [33]:
df.withColumn("sup_emp_id", F.when(F.col("superior_emp_id")>0, 1).otherwise("A")).show()

+------+--------+---------------+-----------+-----------+------+------+----------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|sup_emp_id|
+------+--------+---------------+-----------+-----------+------+------+----------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|         A|
|     2|    Rose|              1|       2010|         20|     M|  4000|         1|
|     3|Williams|              1|       2010|         10|     M|  1000|         1|
|     4|   Jones|              2|       2005|         10|     F|  2000|         1|
|     5|   Brown|              2|       2010|         40|      |     0|         1|
|     6|   Brown|              2|       2010|         50|      |     0|         1|
+------+--------+---------------+-----------+-----------+------+------+----------+



In [34]:
groupBy_df = df.groupBy("emp_dept_id").agg(F.sum("salary").alias("salary"))
groupBy_df.show()

+-----------+------+
|emp_dept_id|salary|
+-----------+------+
|         40|     0|
|         20|  4000|
|         10|  6000|
|         50|     0|
+-----------+------+



## Find unique value

In [35]:
dist_val = df.select("superior_emp_id").distinct().collect()
dist_val

[Row(superior_emp_id=1), Row(superior_emp_id=2), Row(superior_emp_id=-1)]

In [36]:
unique_sup_emp_id = [r.superior_emp_id for r in dist_val]
unique_sup_emp_id

[1, 2, -1]

In [37]:
df.printSchema()

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



# Read data from local machne and create spark dataframe

In [38]:
!ls datasets/

 auto-mpg.csv				  kc_house_data.csv
 brand-laptops-dataset.zip		  laptops.csv
 covid.json				  train_info.csv
 Employee.csv				  train_schedule.csv
'IEA-EV-dataEV salesCarsHistorical.csv'   train_schedule.csv.zip
 indian_states_code_name_mapping.csv


In [39]:
house_df = spark.read.format("csv")\
                .option("inferSchema", True)\
                .option("header", True)\
                .load("datasets/kc_house_data.csv")


house_df.show(truncate=False)

                                                                                

+----------+---------------+---------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+
|id        |date           |price    |bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|lat    |long    |sqft_living15|sqft_lot15|
+----------+---------------+---------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+
|7129300520|20141013T000000|221900.0 |3       |1.0      |1180       |5650    |1.0   |0         |0   |3        |7    |1180      |0            |1955    |0           |98178  |47.5112|-122.257|1340         |5650      |
|6414100192|20141209T000000|538000.0 |3       |2.25     |2570       |7242    |2.0   |0         |0   |3        |7    |2170      |400         

## List all columns 

In [40]:
house_df.columns

['id',
 'date',
 'price',
 'bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view',
 'condition',
 'grade',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'yr_renovated',
 'zipcode',
 'lat',
 'long',
 'sqft_living15',
 'sqft_lot15']

## select list of columns

In [41]:
df = house_df.select("id", "bedrooms","sqft_living","sqft_lot","floors","waterfront", 
                     "view","grade","condition","yr_built","yr_renovated", "date","price")
df.show(truncate=False)

+----------+--------+-----------+--------+------+----------+----+-----+---------+--------+------------+---------------+---------+
|id        |bedrooms|sqft_living|sqft_lot|floors|waterfront|view|grade|condition|yr_built|yr_renovated|date           |price    |
+----------+--------+-----------+--------+------+----------+----+-----+---------+--------+------------+---------------+---------+
|7129300520|3       |1180       |5650    |1.0   |0         |0   |7    |3        |1955    |0           |20141013T000000|221900.0 |
|6414100192|3       |2570       |7242    |2.0   |0         |0   |7    |3        |1951    |1991        |20141209T000000|538000.0 |
|5631500400|2       |770        |10000   |1.0   |0         |0   |6    |3        |1933    |0           |20150225T000000|180000.0 |
|2487200875|4       |1960       |5000    |1.0   |0         |0   |7    |5        |1965    |0           |20141209T000000|604000.0 |
|1954400510|3       |1680       |8080    |1.0   |0         |0   |8    |3        |1987    |

In [42]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- price: double (nullable = true)



## Total grade types

In [43]:
grade_type = df.select("grade").distinct().collect()
grade_type

[Row(grade=12),
 Row(grade=1),
 Row(grade=13),
 Row(grade=6),
 Row(grade=3),
 Row(grade=5),
 Row(grade=9),
 Row(grade=4),
 Row(grade=8),
 Row(grade=7),
 Row(grade=10),
 Row(grade=11)]

In [45]:
grade_type = [row.grade for row in grade_type]
grade_type.sort()
grade_type

[1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]

## Count total grade type

In [46]:
df.groupBy("grade").agg(F.count("grade").alias("total_grade_type")).show()

+-----+----------------+
|grade|total_grade_type|
+-----+----------------+
|   12|              90|
|    1|               1|
|   13|              13|
|    6|            2038|
|    3|               3|
|    5|             242|
|    9|            2615|
|    4|              29|
|    8|            6068|
|    7|            8981|
|   10|            1134|
|   11|             399|
+-----+----------------+



In [47]:
df.select("id", "bedrooms", "sqft_living", "sqft_lot", "grade", "condition", "yr_built", "yr_renovated", "date", "price")\
    .filter(F.col("bedrooms")==3).show(truncate=False)

+----------+--------+-----------+--------+-----+---------+--------+------------+---------------+---------+
|id        |bedrooms|sqft_living|sqft_lot|grade|condition|yr_built|yr_renovated|date           |price    |
+----------+--------+-----------+--------+-----+---------+--------+------------+---------------+---------+
|7129300520|3       |1180       |5650    |7    |3        |1955    |0           |20141013T000000|221900.0 |
|6414100192|3       |2570       |7242    |7    |3        |1951    |1991        |20141209T000000|538000.0 |
|1954400510|3       |1680       |8080    |8    |3        |1987    |0           |20150218T000000|510000.0 |
|1321400060|3       |1715       |6819    |7    |3        |1995    |0           |20140627T000000|257500.0 |
|2008000270|3       |1060       |9711    |7    |3        |1963    |0           |20150115T000000|291850.0 |
|2414600126|3       |1780       |7470    |7    |3        |1960    |0           |20150415T000000|229500.0 |
|3793500160|3       |1890       |6560

## Convert string datetime to actual datetime timestamp data type
See below, date column which datetimestamp as string data type. convert into it datetimestamp data type

In [48]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- price: double (nullable = true)



In [49]:
df_ts = df.withColumn("datetime", F.to_timestamp(F.col("date"), 'yyyyMMdd\'T\'HHmmss').cast(TimestampType())).drop("date")
df_ts.show()

+----------+--------+-----------+--------+------+----------+----+-----+---------+--------+------------+---------+-------------------+
|        id|bedrooms|sqft_living|sqft_lot|floors|waterfront|view|grade|condition|yr_built|yr_renovated|    price|           datetime|
+----------+--------+-----------+--------+------+----------+----+-----+---------+--------+------------+---------+-------------------+
|7129300520|       3|       1180|    5650|   1.0|         0|   0|    7|        3|    1955|           0| 221900.0|2014-10-13 00:00:00|
|6414100192|       3|       2570|    7242|   2.0|         0|   0|    7|        3|    1951|        1991| 538000.0|2014-12-09 00:00:00|
|5631500400|       2|        770|   10000|   1.0|         0|   0|    6|        3|    1933|           0| 180000.0|2015-02-25 00:00:00|
|2487200875|       4|       1960|    5000|   1.0|         0|   0|    7|        5|    1965|           0| 604000.0|2014-12-09 00:00:00|
|1954400510|       3|       1680|    8080|   1.0|         0|  

In [50]:
df_ts.printSchema()

root
 |-- id: long (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- datetime: timestamp (nullable = true)



## Split year, month and day from datetime timestamp

In [51]:
df_ts.select(F.day(F.col("datetime")).alias("date"), \
             F.month(F.col("datetime")).alias("month"), \
             F.year(F.col("datetime")).alias("year") ).show()

+----+-----+----+
|date|month|year|
+----+-----+----+
|  13|   10|2014|
|   9|   12|2014|
|  25|    2|2015|
|   9|   12|2014|
|  18|    2|2015|
|  12|    5|2014|
|  27|    6|2014|
|  15|    1|2015|
|  15|    4|2015|
|  12|    3|2015|
|   3|    4|2015|
|  27|    5|2014|
|  28|    5|2014|
|   7|   10|2014|
|  12|    3|2015|
|  24|    1|2015|
|  31|    7|2014|
|  29|    5|2014|
|   5|   12|2014|
|  24|    4|2015|
+----+-----+----+
only showing top 20 rows



## Count total sold year wise

In [52]:
df_ts.groupBy(F.year("datetime").alias("year")).agg(F.count("id").alias("yearly_sold_out")).show(truncate=False)

+----+---------------+
|year|yearly_sold_out|
+----+---------------+
|2015|6980           |
|2014|14633          |
+----+---------------+



                                                                                

In [53]:
!ls datasets

 auto-mpg.csv				  kc_house_data.csv
 brand-laptops-dataset.zip		  laptops.csv
 covid.json				  train_info.csv
 Employee.csv				  train_schedule.csv
'IEA-EV-dataEV salesCarsHistorical.csv'   train_schedule.csv.zip
 indian_states_code_name_mapping.csv


In [54]:
df = spark.read.format("csv")\
        .option("schemaInfer", True)\
        .option("header", True)\
        .load("datasets/auto-mpg.csv")

df.show(truncate=False)

+---+---------+------------+----------+------+------------+----------+------+----------------------------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|car name                    |
+---+---------+------------+----------+------+------------+----------+------+----------------------------+
|18 |8        |307         |130       |3504  |12          |70        |1     |chevrolet chevelle malibu   |
|15 |8        |350         |165       |3693  |11.5        |70        |1     |buick skylark 320           |
|18 |8        |318         |150       |3436  |11          |70        |1     |plymouth satellite          |
|16 |8        |304         |150       |3433  |12          |70        |1     |amc rebel sst               |
|17 |8        |302         |140       |3449  |10.5        |70        |1     |ford torino                 |
|15 |8        |429         |198       |4341  |10          |70        |1     |ford galaxie 500            |
|14 |8        |454         |220      

## Read another dataset 

In [55]:
import os

In [56]:
for dirname, _, filenames in os.walk('datasets'):
    # print(f"dirname: {dirname}, unknown:{_}, filenames: {filenames}")
    for filename in filenames:
        if filename=="laptops.csv":
            print(os.path.join(dirname, filename))
            df = spark.read.format("csv")\
                        .option("InferSchema", True)\
                        .option("header", True)\
                        .load(f"{os.path.join(dirname, filename)}")

datasets/laptops.csv


In [57]:
df.show(2)

+-----+-----+--------------------+-----+------+---------------+--------------+---------+-----------+----------+--------------------+------------------------+----------------------+--------------------------+---------+----------+---------------+------------+----------------+-----------------+-------+----------------+
|index|brand|               Model|Price|Rating|processor_brand|processor_tier|num_cores|num_threads|ram_memory|primary_storage_type|primary_storage_capacity|secondary_storage_type|secondary_storage_capacity|gpu_brand|  gpu_type|is_touch_screen|display_size|resolution_width|resolution_height|     OS|year_of_warranty|
+-----+-----+--------------------+-----+------+---------------+--------------+---------+-----------+----------+--------------------+------------------------+----------------------+--------------------------+---------+----------+---------------+------------+----------------+-----------------+-------+----------------+
|    1|tecno|Tecno Megabook T1...|23990|    63

In [58]:
df.columns

['index',
 'brand',
 'Model',
 'Price',
 'Rating',
 'processor_brand',
 'processor_tier',
 'num_cores',
 'num_threads',
 'ram_memory',
 'primary_storage_type',
 'primary_storage_capacity',
 'secondary_storage_type',
 'secondary_storage_capacity',
 'gpu_brand',
 'gpu_type',
 'is_touch_screen',
 'display_size',
 'resolution_width',
 'resolution_height',
 'OS',
 'year_of_warranty']

In [59]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- brand: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- processor_brand: string (nullable = true)
 |-- processor_tier: string (nullable = true)
 |-- num_cores: integer (nullable = true)
 |-- num_threads: integer (nullable = true)
 |-- ram_memory: integer (nullable = true)
 |-- primary_storage_type: string (nullable = true)
 |-- primary_storage_capacity: integer (nullable = true)
 |-- secondary_storage_type: string (nullable = true)
 |-- secondary_storage_capacity: integer (nullable = true)
 |-- gpu_brand: string (nullable = true)
 |-- gpu_type: string (nullable = true)
 |-- is_touch_screen: boolean (nullable = true)
 |-- display_size: double (nullable = true)
 |-- resolution_width: integer (nullable = true)
 |-- resolution_height: integer (nullable = true)
 |-- OS: string (nullable = true)
 |-- year_of_warranty: string (nullable = true)



In [61]:
#[5 Apr 2018,012101], [09 Apr 2018, 015102]

In [None]:
#OT 015102, 012101