### What is PySpark?

PySpark is the Python API for **Apache Spark**, a powerful open-source framework for **distributed computing**. It allows for large-scale data processing, providing easy-to-use APIs for working with big data and performing **data analysis**. Spark can process data much faster than traditional tools, such as **Hadoop**, by keeping data in memory rather than writing intermediate results to disk.

Spark works by distributing the data processing tasks across multiple nodes (machines) in a cluster, allowing it to handle massive amounts of data in parallel. PySpark brings the distributed processing capabilities of Apache Spark to Python, making it accessible for data scientists, engineers, and analysts familiar with Python.

### Key Concepts in PySpark

1. **Resilient Distributed Datasets (RDDs)**:
   - RDD is the fundamental data structure in Spark. It represents a collection of objects that can be processed in parallel across multiple nodes in a cluster.
   - RDDs are **immutable**, meaning once they are created, they cannot be modified. However, transformations can be applied to generate new RDDs.

2. **SparkContext**:
   - The **SparkContext** is the entry point to any Spark functionality. It establishes the connection to the Spark cluster, enabling the user to create RDDs, broadcast variables, and perform actions on them.

3. **SparkSession**:
   - The **SparkSession** is the newer and recommended entry point for working with PySpark, as it provides a more unified interface. It allows working with both **RDDs** and **DataFrames**.
   - SparkSession is used to create and manage Spark jobs and enables working with SQL, DataFrame, and Dataset API.

4. **Transformations vs Actions**:
   - **Transformations** are operations that define a new RDD or DataFrame from an existing one, like **map()**, **filter()**, **flatMap()**. Transformations are **lazy**, meaning they are only executed when an **action** is triggered.
   - **Actions** are operations that return a result to the driver or store data to an external storage system, like **count()**, **collect()**, **save()**.

5. **DataFrames**:
   - DataFrames are similar to **tables** in relational databases and **Pandas DataFrames** in Python. They provide a higher-level abstraction than RDDs and are optimized for performing **complex queries** and **data manipulations**.

### Why Use PySpark?

- **Scalability**: PySpark can scale up from a single machine to thousands of nodes, processing petabytes of data.
- **Speed**: PySpark can process data in-memory, which significantly improves the speed of computation compared to traditional systems like Hadoop MapReduce.
- **Ease of Use**: PySpark provides a high-level API for distributed data processing that is accessible to Python developers, making it easier to integrate into Python-based workflows.
- **Unified Processing**: PySpark supports both batch processing and stream processing, giving you flexibility for various use cases.

### PySpark Ecosystem

1. **Spark SQL**: Allows querying data via SQL syntax.
2. **MLlib**: Spark’s scalable machine learning library, providing algorithms for classification, regression, clustering, and more.
3. **GraphX**: Spark's library for graph processing.
4. **Spark Streaming**: Enables real-time stream processing with Spark.

---




# Introduction to Big Data

**Big Data** refers to a vast amount of data that is difficult to process using traditional data processing tools due to its **large volume**, **complexity**, and **variety**. It includes data that is so massive in size and diverse in format that traditional databases and processing systems struggle to handle it efficiently.

### Key Characteristics of Big Data:
- **Volume**: The sheer amount of data generated daily is enormous.
- **Variety**: Big data can come in different formats such as structured, unstructured, and semi-structured.
- **Velocity**: Data is generated at high speeds, requiring real-time processing.
- **Veracity**: The uncertainty of data due to its vastness and inconsistency.

---

# Big Data in Social Media

Did you know that **500+ terabytes** of new data are ingested into Facebook's databases every day? This data includes:
- Photo and video uploads
- Message exchanges
- Comments and other interactions

Additionally, industries like aviation generate massive amounts of data. For example:
- A single **jet engine** can produce **10+ terabytes of data** in just **30 minutes** of flight time.
- With **thousands of flights per day**, the total data generated amounts to **petabytes**.

---

# Types of Big Data

Big Data can be categorized into three types:

### 1. Structured Data
- Data that is organized in a fixed format, typically in rows and columns (e.g., databases).
- Easily stored, accessed, and processed using traditional data tools (like SQL).

### 2. Unstructured Data
- Data that has no predefined structure or format. It can come in various forms, including:
  - Text files
  - Images
  - Videos
- This type of data is harder to process but provides rich insights when analyzed.

### 3. Semi-structured Data
- Data that falls between structured and unstructured, containing elements of both.
- Example: JSON files, XML, and NoSQL databases that have flexible schemas.

---

# Big Data Tools

To handle Big Data, several tools are commonly used:

- **Apache Hadoop**: A framework for distributed storage and processing of large datasets.
- **Apache Spark**: A fast and general-purpose cluster-computing system for large-scale data processing.
- **Apache Cassandra**: A highly scalable NoSQL database designed for handling large amounts of data across many commodity servers.
- **NoSQL Databases**: Databases like MongoDB and Couchbase that handle unstructured data.
- **Data Visualization Tools**: Tools like Tableau and Power BI for visualizing complex data insights.
- **Machine Learning Libraries**: Libraries such as TensorFlow and Scikit-learn for applying machine learning to big data.

---

# What is Apache Spark?

**Apache Spark** is a powerful, **lightning-fast** cluster-computing technology designed for big data processing. It extends the traditional **Hadoop MapReduce** model and includes support for:
- **Interactive Queries**: Allowing real-time data analysis.
- **Stream Processing**: Real-time data processing capabilities.

Spark can handle a variety of workloads, including:
- **Batch Applications**: Processing large volumes of data at once.
- **Iterative Algorithms**: Running algorithms that require multiple passes over the data.
- **Interactive Queries**: Performing quick queries on massive datasets.
- **Streaming Data**: Handling real-time data processing needs.

---

# Evolution of Apache Spark

- **2009**: Apache Spark was developed as a subproject of Hadoop.
- **2010**: It was open-sourced under a BSD license and became an independent project.
- Since then, Spark has grown in popularity due to its speed and versatility in handling Big Data workloads.

---

# Features of Apache Spark

1. **Speed**: Spark runs applications on a Hadoop cluster up to **100 times faster** than traditional MapReduce.
2. **Supports Multiple Languages**: Spark provides built-in APIs in:
   - **Java**
   - **Scala**
   - **Python**
3. **Advanced Analytics**: Spark supports a variety of advanced analytics, including:
   - **SQL Queries**
   - **Stream Processing**
   - **Machine Learning (ML)**
   - **Graph Algorithms**

---

# Spark’s Basic Architecture

Spark runs on a **cluster** of machines that work together to process data. The cluster is managed by a **Cluster Manager**, which can be:
- **Spark's standalone cluster manager**
- **YARN** (Yet Another Resource Negotiator)
- **Mesos**

The **SparkContext** is responsible for managing the cluster and initializing the Spark application.

---

# Spark’s Language APIs

Spark provides APIs in several programming languages, allowing developers to use the language they are most comfortable with:
- **Scala**
- **Java**
- **Python**
- **SQL**
- **R**

---

# Spark’s APIs

Spark offers two main types of APIs:
1. **Low-level unstructured APIs**: For advanced users who need full control over Spark's operations.
2. **High-level structured APIs**: Easier to use and optimized for most use cases, focusing on DataFrames and SQL.

**PySpark** is the Python API for Apache Spark. It allows Python developers to interact with Spark through the Python programming language, making it accessible to data scientists and engineers.

---

# PySpark Abstractions: RDDs and DataFrames

- **RDDs (Resilient Distributed Datasets)**: The fundamental data structure in Spark. RDDs are immutable and support fault-tolerant parallel processing across multiple machines.
- **DataFrames**: A higher-level abstraction over RDDs that resembles tables in relational databases. DataFrames are optimized for querying and transforming data.


# Setting Up PySpark

Before we start working with PySpark, you need to install **PySpark** in your Python environment. To install it, you can use the following command in a Jupyter notebook cell:



In [None]:
!pip install pyspark



### Importing PySpark

In [None]:
import pyspark

### Setting Up SparkContext

 **SparkConf** allows you to set various configuration parameters for a Spark application.
- You can use SparkConf to configure settings such as the application name, Spark master URL, and other runtime properties.

**SparkContext** It is responsible for coordinating the execution of Spark jobs on a cluster


**conf = SparkConf().setMaster("local").setAppName("HappyLearning")**

---

creates a Spark configuration object (conf) that tells Apache Spark how to run your application.
- .setMaster("local") - tells Spark where to run. "local" means run on your local machine, using a single thread.

- .setAppName("HappyLearning") - sets the name of your Spark application.

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("HappyLearning")
sc = SparkContext(conf = conf)
sc

### Creating an RDD (Resilient Distributed Dataset)

In [None]:
stringRDD = sc.parallelize(["Spark is awesome","Spark is cool"])
stringRDD

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

### Displaying the RDD

In [None]:
stringRDD

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

### Collecting the RDD Data

In [None]:
stringRDD.collect()

['Spark is awesome', 'Spark is cool']

### Transforming Data in RDDs (Using `map()`)

In [None]:
stringRDD_uppercase= stringRDD.map(lambda x: x.upper())
stringRDD_uppercase.collect()

['SPARK IS AWESOME', 'SPARK IS COOL']

### Using `flatMap()` Transformation

In [None]:
flatMap_Split= stringRDD.flatMap(lambda x: x.split(" "))
flatMap_Split.collect()

['Spark', 'is', 'awesome', 'Spark', 'is', 'cool']

### Collecting Data from RDD



In [None]:
stringRDD.collect()

['Spark is awesome', 'Spark is cool']

### Using `filter()` Transformation

In [None]:
awesomeLineRDD = stringRDD.filter(lambda x: "awesome" in x)
awesomeLineRDD.collect()

['Spark is awesome']

### Using `filter()` with Case Insensitivity

In [None]:
sparkLineRDD = stringRDD.filter(lambda x: "spark" in x.lower())
sparkLineRDD.collect()

['Spark is awesome', 'Spark is cool']

### Using `pandas` to Read a CSV File

In [None]:
import pandas as pd
type(pd.read_csv('test1.csv'))

### What is SparkSession in PySpark?

In **PySpark**, **SparkSession** is the main entry point to interact with **Apache Spark**. It serves as the interface for working with structured data (e.g., DataFrames) and running SQL queries. A **SparkSession** allows you to manage the Spark application and access the various Spark features.

#### Key Features of SparkSession:
- **Create DataFrames**: You can create **DataFrames** using data from various sources (CSV, Parquet, JSON, etc.) and perform data transformations on them.
- **Register DataFrames as Tables**: SparkSession allows you to register DataFrames as temporary SQL tables, enabling SQL queries on the DataFrame.
- **Execute SQL Queries**: With SparkSession, you can execute **SQL queries** on registered tables, enabling you to perform SQL-based operations on your data.
- **Distributed Data Operations**: It also enables performing distributed data processing using Spark’s **engine**, allowing operations to be applied on large datasets spread across multiple nodes.

#### Example Usage:
1. **Creating DataFrames**: You can read data from external sources (e.g., CSV, JSON, or Parquet) and create DataFrames.
2. **SQL Queries**: After registering a DataFrame as a table, you can run SQL queries directly using **`spark.sql()`**.
3. **Optimized Execution**: SparkSession provides an optimized engine for processing large-scale data, including lazy evaluation and distributed computing.

### Key Points:
- **SparkSession** is the main entry point for using **Spark SQL** and working with **structured data** in PySpark.
- It simplifies the process of managing Spark sessions, enabling the use of both **DataFrame** and **SQL** APIs.


### Initializing SparkSession


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practise').getOrCreate()


In [None]:
spark

### Reading Data from a CSV File and Displaying It

In [None]:
df_pyspark = spark.read.csv('test1.csv')
df_pyspark

df_pyspark.show()

+-------+---+----------+------+
|    _c0|_c1|       _c2|   _c3|
+-------+---+----------+------+
|   Name|age|Experience|Salary|
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Reading CSV with Header and Displaying Data

In [None]:
df_pyspark = spark.read.option('header','true').csv('test1.csv')
print(df_pyspark)  # Check Schema
df_pyspark.show()

DataFrame[Name: string, age: string, Experience: string, Salary: string]
+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Checking the DataFrame Type

In [None]:
type(df_pyspark)

### Displaying the Schema of a DataFrame

In [None]:
df_pyspark.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



### Displaying the Top Rows of a DataFrame

In [None]:
df_pyspark.show()

+-------+---+----------+------+
|    _c0|_c1|       _c2|   _c3|
+-------+---+----------+------+
|   Name|age|Experience|Salary|
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Displaying DataFrame Column Data Types

In [None]:
print(df_pyspark.dtypes)


[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]
DataFrame[Name: string, age: int, Experience: int, Salary: int]


### Displaying the Column Names of a DataFrame

In [None]:
print(df_pyspark.columns)

['Name', 'age', 'Experience', 'Salary']


### Displaying the First Few Rows of a DataFrame

In [None]:
df_pyspark.head(3)

[Row(Name='Sourabh', age=31, Experience=10, Salary=30000),
 Row(Name='Disha', age=30, Experience=8, Salary=25000),
 Row(Name='Sunny', age=29, Experience=4, Salary=20000)]

### Displaying the First Few Rows of a DataFrame

In [None]:
df_pyspark.show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Selecting Specific Columns from a DataFrame

In [None]:
df_pyspark.select(['Name','Experience']).show()

df_pyspark.select('Name').show()

+-------+----------+
|   Name|Experience|
+-------+----------+
|Sourabh|        10|
|  Disha|         8|
|  Sunny|         4|
|   Paul|         3|
| Harsha|         1|
|Shubham|         2|
+-------+----------+

+-------+
|   Name|
+-------+
|Sourabh|
|  Disha|
|  Sunny|
|   Paul|
| Harsha|
|Shubham|
+-------+



### Generating Summary Statistics for a DataFrame


In [None]:
df_pyspark.describe().show()

### Adding a New Column to a DataFrame

In [None]:
df_pyspark=df_pyspark.withColumn('Experience After 2 year',df_pyspark['Experience']+2)
df_pyspark


DataFrame[Name: string, age: string, Experience: string, Salary: string, Experience After 2 year: double]

### Displaying the Updated DataFrame

In [None]:
df_pyspark.show()

+-------+---+----------+------+-----------------------+
|   Name|age|Experience|Salary|Experience After 2 year|
+-------+---+----------+------+-----------------------+
|Sourabh| 31|        10| 30000|                   12.0|
|  Disha| 30|         8| 25000|                   10.0|
|  Sunny| 29|         4| 20000|                    6.0|
|   Paul| 24|         3| 20000|                    5.0|
| Harsha| 21|         1| 15000|                    3.0|
|Shubham| 23|         2| 18000|                    4.0|
+-------+---+----------+------+-----------------------+



### Dropping a Column from a DataFrame: **drop()**

In [None]:
df_pyspark=df_pyspark.drop('Experience After 2 year')
df_pyspark.show()


+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Renaming a Column in a DataFrame - **withColumnRenamed()**

In [None]:
df_pyspark.withColumnRenamed('Name','New Name').show()


+--------+---+----------+------+
|New Name|age|Experience|Salary|
+--------+---+----------+------+
| Sourabh| 31|        10| 30000|
|   Disha| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|  Harsha| 21|         1| 15000|
| Shubham| 23|         2| 18000|
+--------+---+----------+------+



### Working with DataFrames: Reading CSV, Inspecting Schema, and Dropping a Column

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()

df_pyspark=spark.read.csv('test2.csv', header=True, inferSchema=True )
df_pyspark.printSchema()
df_pyspark.show()

df_pyspark.drop('Name').show()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)

+-------+----+----------+------+
|   Name| age|Experience|Salary|
+-------+----+----------+------+
|Sourabh|  31|        10| 30000|
|  Disha|  30|         8| 25000|
|  Sunny|  29|         4| 20000|
|   Paul|  24|         3| 20000|
| Harsha|  21|         1| 15000|
|Shubham|  23|         2| 18000|
| Mahesh|NULL|      NULL| 40000|
|   NULL|  34|        10| 38000|
|   NULL|  36|      NULL|  NULL|
+-------+----+----------+------+

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|NULL|      NULL| 40000|
|  34|        10| 38000|
|  36|      NULL|  NULL|
+----+----------+------+



### Displaying the DataFrame

In [None]:
df_pyspark.show()

### **df_pyspark.na.drop()** - Dropping Rows with All Null Values

In [None]:
### how == all
df_pyspark.na.drop(how="all").show() #drop the rows  if all entries are null in that row.

+-------+----+----------+------+
|   Name| age|Experience|Salary|
+-------+----+----------+------+
|Sourabh|  31|        10| 30000|
|  Disha|  30|         8| 25000|
|  Sunny|  29|         4| 20000|
|   Paul|  24|         3| 20000|
| Harsha|  21|         1| 15000|
|Shubham|  23|         2| 18000|
| Mahesh|NULL|      NULL| 40000|
|   NULL|  34|        10| 38000|
|   NULL|  36|      NULL|  NULL|
+-------+----+----------+------+



### Dropping Rows with Any Null Value

In [None]:
### how ==any
df_pyspark.na.drop(how="any").show() #drop the rows even if there is a single null value.

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Imputing Missing Values Using PySpark's Imputer


In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+-------+----+----------+------+-----------+------------------+--------------+
|   Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+-------+----+----------+------+-----------+------------------+--------------+
|Sourabh|  31|        10| 30000|         31|                10|         30000|
|  Disha|  30|         8| 25000|         30|                 8|         25000|
|  Sunny|  29|         4| 20000|         29|                 4|         20000|
|   Paul|  24|         3| 20000|         24|                 3|         20000|
| Harsha|  21|         1| 15000|         21|                 1|         15000|
|Shubham|  23|         2| 18000|         23|                 2|         18000|
| Mahesh|NULL|      NULL| 40000|         29|                 4|         40000|
|   NULL|  34|        10| 38000|         34|                10|         38000|
|   NULL|  36|      NULL|  NULL|         36|                 4|         20000|
+-------+----+----------+------+-----------+--------

#  Pyspark Dataframes

inferSchema=True,  -- automatically examine the data and guess the correct data types for each column.

### Reading and Displaying Data from a CSV File

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('dataframe').getOrCreate()

df_pyspark=spark.read.csv('test1.csv',header=True,inferSchema=True)
df_pyspark.show()


+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Filtering Data Based on a Condition

In [None]:
df_pyspark.filter("Salary<=20000").show()


+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Filtering and Selecting Specific Columns

In [None]:
df_pyspark.filter("Salary<=20000").select(['Name','age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



### Filtering Data with Multiple Conditions

In [None]:
df_pyspark.filter((df_pyspark['Salary']<=20000) |
                  (df_pyspark['Salary']>=15000)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Filtering Data with Multiple Conditions Using AND

In [None]:
df_pyspark.filter((df_pyspark['Salary']<=20000) &
                  (df_pyspark['Salary']>=15000)).show()

### Creating a SparkSession for Aggregations

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Agg').getOrCreate()
spark


### Reading Data from a CSV File

In [None]:
df_pyspark=spark.read.csv('test3.csv',header=True,inferSchema=True)
df_pyspark.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|  Sourabh|Data Science| 10000|
|  Sourabh|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|  Sourabh|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



### Printing the Schema of a DataFrame

In [None]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



### Grouping Data by a Column

In [None]:
df_pyspark.groupBy('Name')

GroupedData[grouping expressions: [Name], value: [Name: string, Departments: string ... 1 more field], type: GroupBy]

### Grouping Data and Summing Values

In [None]:
## Groupby
df_pyspark.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|  Sourabh|      19000|
|    Sunny|      12000|
|   Mahesh|       7000|
+---------+-----------+



### Grouping Data and Calculating Average

In [None]:
df_pyspark.groupBy('Name').avg().show()

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|  Sourabh| 6333.333333333333|
|    Sunny|            6000.0|
|   Mahesh|            3500.0|
+---------+------------------+



### Grouping Data and Summing Values by Department

In [None]:

df_pyspark.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



# Examples Of Pyspark ML

https://spark.apache.org/examples.html

### Creating a SparkSession for Handling Missing Data

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Missing').getOrCreate()


### Reading the Dataset and Inspecting It

In [None]:
## Read The dataset
training = spark.read.csv('test1.csv',header=True,inferSchema=True)
training.show()
training.printSchema()


+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|Sourabh| 31|        10| 30000|
|  Disha| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



### Displaying the Column Names of a DataFrame

In [None]:
training.columns

['Name', 'age', 'Experience', 'Salary']

### Assembling Features into a Vector

In [None]:
from pyspark.ml.feature import VectorAssembler

featureassembler = VectorAssembler(
                        inputCols= ["age","Experience"],
                        outputCol= "Independent Features")
output = featureassembler.transform(training)
output.show()

+-------+---+----------+------+--------------------+
|   Name|age|Experience|Salary|Independent Features|
+-------+---+----------+------+--------------------+
|Sourabh| 31|        10| 30000|         [31.0,10.0]|
|  Disha| 30|         8| 25000|          [30.0,8.0]|
|  Sunny| 29|         4| 20000|          [29.0,4.0]|
|   Paul| 24|         3| 20000|          [24.0,3.0]|
| Harsha| 21|         1| 15000|          [21.0,1.0]|
|Shubham| 23|         2| 18000|          [23.0,2.0]|
+-------+---+----------+------+--------------------+



In [None]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Independent Features']

### Selecting Specific Columns from a DataFrame


In [None]:
finalized_data = output.select("Independent Features","Salary")

In [None]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



### Splitting Data and Fitting a Linear Regression Model


In [None]:
from pyspark.ml.regression import LinearRegression

train_data,test_data = finalized_data.randomSplit([0.75,0.25])

regressor=LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor=regressor.fit(train_data)


### Accessing the Coefficients of a Linear Regression Model


In [None]:
regressor.coefficients

DenseVector([-64.8464, 1584.7554])

### Accessing the Intercept of a Linear Regression Model


In [None]:
regressor.intercept

15414.10693970355

### Evaluating the Linear Regression Model on Test Data


In [None]:
pred_results=regressor.evaluate(test_data)

### Displaying the Predictions from the Model


In [None]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [24.0,3.0]| 20000|18612.059158134216|
+--------------------+------+------------------+




### Spark SQL
- - -

- Spark SQL **provides a DataFrame API** that can perform 'relational operations on both external data sources and Spark's built-in distributed collections' —at scale!

- It support a wide variety of  data sources and algorithms in Big Data, which makes it easy to
  - **add data sources**,
  - **optimization rules**, and
  - **data types** for advanced analytics such as machine learning.

- Spark SQL provides
  - **state-of-the-art SQL performance** and
  - **maintains compatibility with all existing structures and components**

### Useful references for this Notebook
* [PySpark in Jupyter Notebook — Working with Dataframe & JDBC Data Sources](https://medium.com/@thucnc/pyspark-in-jupyter-notebook-working-with-dataframe-jdbc-data-sources-6f3d39300bf6)
* [PySpark - Working with JDBC Sqlite database](http://mitzen.blogspot.com/2017/06/pyspark-working-with-jdbc-sqlite.html)

### Create a SparkSession and read the a stock price dataset CSV

### Installing PySpark


In [None]:
pip install pyspark



### Importing PySpark SQL Functions


In [None]:
from pyspark.sql.functions import max,min,first,last,kurtosis,mean,skewness

### Importing Additional PySpark SQL Functions


In [None]:
from pyspark.sql.functions import stddev,stddev_samp,stddev_pop,sum,sumDistinct,variance,var_samp,var_pop,sum

### Creating a SparkSession for SQL Operations


In [None]:
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession

spark1 = SparkSession.builder.appName('SQL').getOrCreate()
spark1

### Reading Data from a CSV File into a DataFrame


In [None]:
df = spark1.read.csv('appl_stock.csv', inferSchema=True, header=True )
df

DataFrame[Date: date, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]

### Printing the Schema of a DataFrame


In [None]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### Creating or Replacing a Temporary View


In [None]:
df.createOrReplaceTempView('stock')

Now run a simple SQL query directly on this view. It returns a DataFrame.

LIMIT 5 → Restrict the result to just 5 rows (

### Running a SQL Query on the Temporary View


In [None]:
# SQL
- SELECT * FROM stock

### Running an SQL Query with LIMIT on the Temporary View


In [None]:
result = spark1.sql("SELECT * FROM stock LIMIT 5")
result

DataFrame[Date: date, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]

### Displaying the Column Names of the Resulting DataFrame


In [None]:
result.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [None]:
result.show()

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+



### Run slightly more complex queries
How many entries in the `Close` field are higher than 500?

### Counting Rows Based on a Condition in SQL


In [None]:
count_greater_500 = spark1.sql("SELECT COUNT(Close) FROM stock WHERE Close > 500").show()

+------------+
|count(Close)|
+------------+
|         403|
+------------+



What is the average `Open` values of all the entries where `Volume` is either greater than 120 million or less than 110 million?

### Calculating the Average with a Conditional Filter


In [None]:
avg_1 = spark1.sql("SELECT AVG(Open) FROM stock WHERE Volume > 120000000 OR Volume < 110000000").show()

+------------------+
|         avg(Open)|
+------------------+
|309.12406365290224|
+------------------+



### Initializing SparkSession


In [None]:
import pandas as pd
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('app').getOrCreate()
spark

### Loading the Data and Creating Table View

In [None]:
import pandas as pd
mtcars = pd.read_csv('mtcars.csv')
mtcars.head()


Unnamed: 0.1,Unnamed: 0,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


In [None]:
mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

### Loading the Dataframe into Spark


In [None]:
sdf = spark.createDataFrame(mtcars)

### Creating Table View

In [None]:
spark.sql("SELECT * FROM cars").show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|         Unnamed: 0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

###Selecting a Specific Table

In [None]:
spark.sql("SELECT mpg FROM cars").show(5)

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
+----+
only showing top 5 rows



### Basic SQL filtering

In [None]:
#Basic filtering query to determine cards that have a high mileage and low cylinder count

spark.sql ("SELECT* FROM cars where mpg>20 AND cyl<6"). show(5)

+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Unnamed: 0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|  Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|   Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|   Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



### Aggregation

In [None]:
#Aggregating data and grouping by cylinders
spark.sql ("SELECT count (*), cyl from cars GROUP BY cyl").show()

+--------+---+
|count(1)|cyl|
+--------+---+
|       7|  6|
|      14|  8|
|      11|  4|
+--------+---+



### grouping function

In [None]:
df.cube("Department", "employee_name").agg(grouping("Department").alias("is_dept_grouped")).show()

+----------+-------------+---------------+
|Department|employee_name|is_dept_grouped|
+----------+-------------+---------------+
|     Sales|        Jonny|              0|
|     Sales|       Rajesh|              0|
|     Sales|        Sahil|              0|
|   Finance|        Sania|              0|
|     Sales|       Aakash|              0|
|     Sales|         Saif|              0|
| Marketing|        Ankit|              0|
| Marketing|        Kumar|              0|
|   Finance|        Tarun|              0|
|   Finance|         Ajay|              0|
|   Finance|         NULL|              0|
|      NULL|       Aakash|              1|
|      NULL|         NULL|              1|
|      NULL|        Jonny|              1|
|      NULL|        Sania|              1|
|      NULL|       Rajesh|              1|
|      NULL|        Sahil|              1|
|     Sales|         NULL|              0|
| Marketing|         NULL|              0|
|      NULL|         Saif|              1|
+----------

###first function
returns the first element in a column when ignoreNulls is set to true, it returns the first non-null element.

In [None]:
df.select(first("salary")).show(truncate=False)

+-------------+
|first(salary)|
+-------------+
|3000         |
+-------------+



###last function
last() function returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.

In [None]:
df.select(last("salary")).show(truncate=False)

+------------+
|last(salary)|
+------------+
|4100        |
+------------+



###kurtosis function:

- Kurtosis is a statistical measure that describes the shape, or "tailedness," of a distribution.
- kurtosis() function returns the kurtosis of the values in a group.


In [None]:
df.select(kurtosis("salary")).show(truncate=False)

+-------------------+
|kurtosis(salary)   |
+-------------------+
|-0.6467803030303032|
+-------------------+



###max function
- returns the maximum value in a column.

In [None]:
df.select(max("salary")).show(truncate=False)

+-----------+
|max(salary)|
+-----------+
|4600       |
+-----------+



###min function

In [None]:
df.select(min("salary")).show(truncate=False)

+-----------+
|min(salary)|
+-----------+
|2000       |
+-----------+



###mean function

In [None]:
df.select(mean("salary")).show(truncate=False)

+-----------+
|avg(salary)|
+-----------+
|3400.0     |
+-----------+



###skewness function:
- It provides **information about the lack of symmetry** in the distribution.
- skewness() function **returns the skewness of the values** in a group.

In [None]:
df.select(skewness("salary")).show(truncate=False)

+--------------------+
|skewness(salary)    |
+--------------------+
|-0.12041791181069571|
+--------------------+



###stddev(), stddev_samp() and stddev_pop()
- stddev() alias for stddev_samp.

- stddev_samp() function returns the sample standard deviation of values in a column.

- stddev_pop() function returns the population standard deviation of the values in a column.

In [None]:
df.select(stddev("salary"), stddev_samp("salary"), stddev_pop("salary")).show(truncate=False)

+-----------------+-------------------+------------------+
|stddev(salary)   |stddev_samp(salary)|stddev_pop(salary)|
+-----------------+-------------------+------------------+
|765.9416862050705|765.9416862050705  |726.636084983398  |
+-----------------+-------------------+------------------+



###sum function
Returns the sum of all values in a column.

In [None]:
df.select(sum("salary")).show(truncate=False)

+-----------+
|sum(salary)|
+-----------+
|34000      |
+-----------+



###sumDistinct function
returns the **sum of all distinct values** in a column.

In [None]:
df.select( sumDistinct("salary") ).show(truncate=False)




+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+



###variance(), var_samp(), var_pop()
- variance() alias for var_samp
- var_samp() function -  returns the unbiased variance of the values in a column.
- var_pop() function - returns the population variance of the values in a column.

In [None]:
df.select(variance("salary"),var_samp("salary"),var_pop("salary")).show(truncate=False)

+-----------------+-----------------+---------------+
|var_samp(salary) |var_samp(salary) |var_pop(salary)|
+-----------------+-----------------+---------------+
|586666.6666666666|586666.6666666666|528000.0       |
+-----------------+-----------------+---------------+



In [None]:
!pip install pyspark



In [None]:
from pyspark.sql.functions import *

###PySpark SQL Date Functions

| PYSPARK DATE FUNCTION   | DATE FUNCTION DESCRIPTION                                                                                   |
|-------------------------|-----------------------------------------------------------------------------------------------------------|
| current_date()          | Returns the current date as a date column.                                                                  |
| date_format(dateExpr, format) | Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument. |
| to_date()               | Converts the column into `DateType` by casting rules to `DateType`.                                         |
| to_date(column, fmt)    | Converts the column into a `DateType` with a specified format.                                               |
| add_months(Column, numMonths) | Returns the date that is `numMonths` after `startDate`.                                                      |
| date_add(column, days)  | Returns the date that is `days` days after `start`.                                                          |
| date_sub(column, days)  | Returns the date that is `days` days before `start`.                                                         |
| datediff(end, start)    | Returns the number of days from `start` to `end`.                                                            |
| months_between(end, start) | Returns the number of months between dates `start` and `end`.                                                |
| months_between(end, start, roundOff) | Returns the number of months between dates `end` and `start`. If `roundOff` is set to true, the result is rounded off to 8 digits; otherwise, it is not rounded. |
| next_day(column, dayOfWeek) | Returns the first date which is later than the value of the `date` column that is on the specified day of the week. |
| trunc(column, format)   | Returns date truncated to the unit specified by the format.                                                  |
| date_trunc(format, timestamp) | Returns timestamp truncated to the unit specified by the format.                                             |
| year(column)            | Extracts the year as an integer from a given date/timestamp/string.                                          |
| quarter(column)         | Extracts the quarter as an integer from a given date/timestamp/string.                                       |
| month(column)           | Extracts the month as an integer from a given date/timestamp/string.                                         |
| dayofweek(column)       | Extracts the day of the week as an integer from a given date/timestamp/string. Ranges from 1 for Sunday through 7 for Saturday. |
| dayofmonth(column)      | Extracts the day of the month as an integer from a given date/timestamp/string.                              |
| dayofyear(column)       | Extracts the day of the year as an integer from a given date/timestamp/string.                               |
| weekofyear(column)      | Extracts the week number as an integer from a given date/timestamp/string. A week is considered to start on a Monday, and week 1 is the first week with more than 3 days, as defined by ISO 8601. |
| last_day(column)        | Returns the last day of the month which the given date belongs to. For example, input "2015-07-27" returns "2015-07-31" since July 31 is the last day of the month in July 2015. |
| from_unixtime(column)   | Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the yyyy-MM-dd HH:mm:ss format. |
| from_unixtime(column, f) | Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format. |
| unix_timestamp()        | Returns the current Unix timestamp (in seconds) as a long.                                                  |
| unix_timestamp(column)  | Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale. |
| unix_timestamp(column, p) | Converts time string with the given pattern to Unix timestamp (in seconds).                                  |


### Creating a DataFrame and Displaying Data


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


spark = SparkSession.builder \
            .appName('SparkByExamples.com') \
            .getOrCreate()
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","date"])
df.show()


+---+----------+
| id|      date|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



###current_date()

In [None]:
df.select(current_date().alias("current_date")  ).show(1)

+------------+
|current_date|
+------------+
|  2023-12-27|
+------------+
only showing top 1 row



###date_format()

In [None]:
df.select(col("date"),
    date_format(col("date"), "MM-dd-yyyy").alias("date_format")
  ).show()

+----------+-----------+
|      date|date_format|
+----------+-----------+
|2020-02-01| 02-01-2020|
|2019-03-01| 03-01-2019|
|2021-03-01| 03-01-2021|
+----------+-----------+



###to_date()

In [None]:
df.select(col("date"),
    to_date(col("date"), "yyy-MM-dd").alias("to_date")
  ).show()

+----------+----------+
|      date|   to_date|
+----------+----------+
|2020-02-01|2020-02-01|
|2019-03-01|2019-03-01|
|2021-03-01|2021-03-01|
+----------+----------+



### Installing MySQL Python Connectors


In [None]:
#pip install pymysql
!pip install mysql-connector-python


Collecting mysql-connector-python
  Downloading mysql_connector_python-9.3.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (7.2 kB)
Downloading mysql_connector_python-9.3.0-cp311-cp311-manylinux_2_28_x86_64.whl (33.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m51.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.3.0


### Importing MySQL Connector for Python


In [None]:
import mysql.connector as sql

### Connecting to a MySQL Database


In [None]:
# Ensure you have created a database by name "company" by looging into the MySQL instance

db = sql.connect(
    host='localhost',
    user='root',
    password='password',
    database='company'
)


InterfaceError: 2003: Can't connect to MySQL server on 'localhost:3306' (Errno 111: Connection refused)

### Executing SQL Commands with MySQL Connector


In [None]:
mycursor = db.cursor()
mycursor.execute("SELECT VERSION()")

# Drop table if it already exist using execute() method.
mycursor.execute("DROP TABLE IF EXISTS EMPLOYEE")

# Create table as per the requirement
sql = """CREATE TABLE EMPLOYEE (
         FIRST_NAME  CHAR(20) NOT NULL,
         LAST_NAME  CHAR(20),
         AGE INT,
         SEX CHAR(1),
         INCOME FLOAT )"""

mycursor.execute(sql)

### INSERT operation on EMPLOYEE table

In [None]:
sql = """INSERT INTO <EMPLOYEE>
         VALUES ('Mac', 'Beth', 20, 'M', 2000)"""
try:
   mycursor.execute(sql)
   db.commit()
except:
   db.rollback()

In [None]:
sql = """INSERT INTO EMPLOYEE
         VALUES ('Kathy', 'Moss', 20, 'F', 1000)"""
try:
   mycursor.execute(sql)
   db.commit()
except:
   db.rollback()

### Retrieving all the data from the EMPLOYEE table

In [None]:
query = "SELECT * FROM EMPLOYEE"
mycursor.execute(query)
data3 = mycursor.fetchall()
print (data3)

[('Mac', 'Beth', 20, 'M', 2000.0), ('Kathy', 'Moss', 20, 'F', 1000.0)]


In [None]:
for x in data3:
    print(x)

('Mac', 'Beth', 20, 'M', 2000.0)
('Kathy', 'Moss', 20, 'F', 1000.0)


### Retrieving the data from the 'EMPLOYEE' table based on the condition

In [None]:
# Fetching the rows where salary value is less than 2000
sql = "SELECT * FROM EMPLOYEE \
       WHERE INCOME < '%d'" % (2000)
mycursor.execute(sql)
res = mycursor.fetchall()
print(res)

[('Kathy', 'Moss', 20, 'F', 1000.0)]


### Update Operation

In [None]:
# Prepare SQL query to UPDATE required records
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1  WHERE SEX = '%c'" % ('M')
try:
   mycursor.execute(sql)
   db.commit()
except:
   db.rollback()

In [None]:
sql = "SELECT * FROM EMPLOYEE"
mycursor.execute(sql)
upd = mycursor.fetchall()
print(upd)

[('Mac', 'Beth', 21, 'M', 2000.0), ('Kathy', 'Moss', 20, 'F', 1000.0)]


### Delete Operation

In [None]:
# Deleting all the rows where age is greater than 20
sql = "DELETE FROM EMPLOYEE WHERE AGE > '%d'" % (20)
try:
   mycursor.execute(sql)
   db.commit()
except:
   db.rollback()

In [None]:
sql = "SELECT * FROM EMPLOYEE"
mycursor.execute(sql)
upd = mycursor.fetchall()
print(upd)

[('Kathy', 'Moss', 20, 'F', 1000.0)]


###Checking all the databases

In [None]:
mycursor.execute("show databases")
for i in mycursor:
    print(i)

('admissions',)
('company',)
('hr',)
('information_schema',)
('mycart',)
('myclass',)
('mysql',)
('performance_schema',)
('shop',)
('sys',)
