# <center>ETL: Extract</center>

In ETL, extraction involves retrieving data from persistent storage into memory. This involves identifying the source systems, determining the data to be extracted, and accessing the source data. The data engineer may need to use various techniques to extract data, such as connecting to a database, accessing a file system, or consuming data from an API.

# Extracting with Pandas

### CSV Files

Data can be extracted from unstructured (e.g. book) or structured text files (e.g. csv).

In [None]:
import pandas as pd

df = pd.read_csv("/path/to/data.csv")

### JSON Files

Many web services use JSON files to communicate data which hold information in a semi-structured way. <br>
JSON files can be parsed into Python and converted to a Pandas DataFrame for analysis.

In [None]:
import pandas as pd

# Read the JSON file into a Pandas DataFrame
df = pd.read_json('path/to/json/file')

Note: If your JSON file has nested objects, you may need to use the pd.json_normalize() method with appropriate arguments to flatten the nested objects.

In [None]:
import json
import pandas as pd

# Load the JSON file into a Python object
with open('data.json', 'r') as f:
    data = json.load(f)

# Convert the Python object into a pandas DataFrame
df = pd.json_normalize(data)

### APIs

The internet allows communication between clients (like web browsers or applications) and servers (like computers) using the HTTP protocol. Clients send requests to servers using URLs that specify the type of request (e.g. GET, POST, PUT, DELETE) and any necessary data or parameters. The server processes the request and sends back a response containing data or other content.

APIs (Application Programming Interfaces) are a type of server that provides data in a standardized format (usually JSON or XML) that software applications can use to communicate with each other.

Python's Requests library simplifies the process of making HTTP requests from a Python program. It allows you to create a request object with the necessary information and send it to the server. Once the server responds, you can access the response data and other information using methods and properties of the response object. 

In [None]:
import requests
import pandas as pd

# Send a GET request to the API endpoint you want to access
response = requests.get("https://api.example.com/data")

# Parse response data as a JSON object
data = response.json()

# Convert the JSON data to a Pandas DataFrame
df = pd.DataFrame.from_dict(data)

### Databases

Extracting data from databases is probably the most common. For this to happen, a connection string (or URI) is needed which holds information on how to connect to the database. <br>
It typically contains the database type (e.g. PostgreSQL), username and password, host and port. This connection can be used to create a database engine which can be passed to packages like Pandas to interact with the data in the database.

In [None]:
import sqlalchemy
import pandas as pd

# Create connection
connection_uri = "postgresql://user:password@host:port"
db_engine = sqlalchemy.create_engine(connection_uri)

# Pandas .read_sql() method
df = pd.read_sql("SELECT * FROM my_table", db_engine)

### Bonus: Batch Processing

Batch processing is a way of processing data in batches or chunks, rather than all at once. It is useful when working with large datasets that may not fit into memory. <br>
Pandas supports batch processing through its powerful dataframe and series objects. It allows us to read and write data into chunks, but if the code is ran multiple times, duplicate data will be generated in the output file.

In [None]:
import pandas as pd

# Read the CSV file in batches of 1000 rows
for chunk in pd.read_csv('input_file.csv', chunksize=1000):
    # Do some processing on the chunk
    processed_chunk = chunk.apply(lambda x: x + 1)

    # Export the processed chunk to a CSV file
    processed_chunk.to_csv('output_file.csv', mode='a', index=False, header=False)

# Extracting with PySpark

In PySpark, you can extract data from a variety of sources using the read method of the SparkSession object. A SparkSession can be created like this: <br>

```SparkSession.builder.appName("name_of_app").getOrCreate()``` 

1. SparkSession.builder: This creates a Builder object that is used to configure and create a SparkSession.

2. appName("name_of_app"): This sets the name of the application, which is used to identify the Spark application in the Spark UI and logs.

3. getOrCreate(): This method returns a SparkSession or creates a new one if it does not already exist in the current context. This method is useful when you want to reuse an existing SparkSession across multiple code files or when running code in a notebook environment.

A SparkSession can be used to interact with Spark and perform distributed data processing. <br>
Once the SparkSession is created, you can use it to read and write data, create DataFrames, perform transformations, and run Spark jobs.

### CSV Files

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("ETL").getOrCreate()

# Read a CSV file into a DataFrame
df = spark.read.csv("path/to/csv/file")

### JSON Files

Unlike Pandas' ```.read_json()``` method, PySpark's DataFrameReader is capable of handling nested JSON structures.

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("ETL").getOrCreate()

# Read a JSON file into a DataFrame
df = spark.read.json("path/to/json/file")

### APIs

To extract data from an API using PySpark, you can use the requests library to make HTTP requests to the API, and then parse the response data using PySpark's DataFrame API. 

In [None]:
import requests
from pyspark.sql import SparkSession

# Create a SparkSession object
spark = SparkSession.builder.appName("ETL").getOrCreate()

# Send a GET request to the API endpoint
response = requests.get("https://api.example.com/data")

# Convert the response data to a PySpark DataFrame
df = spark.createDataFrame([response.json()])

### Databases

To extract data from a SQL database using PySpark, you can use the JDBC connector. In the example below,
the JDBC connector is used to connect to a PostgreSQL database running on the local machine, using the url, table, and properties variables to specify the database connection properties. The spark.read.jdbc method is then used to read data from the database into a PySpark DataFrame, which can be used to analyze and manipulate the data. You can then display the DataFrame schema and the first few rows of data to verify that the data was loaded correctly, using the df.printSchema() and df.show() methods, respectively.

Note that the specific method for connecting to a SQL database will depend on the database you're using and the specific data you want to extract. You may need to adjust the code to handle authentication, database drivers, or other database-specific features.


In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession object
spark = SparkSession.builder.appName("ETL").getOrCreate()

# Define the JDBC connection properties
url = "jdbc:postgresql://localhost:5432/my_database"
table = "my_table"
properties = {
    "user": "my_user",
    "password": "my_password",
    "driver": "org.postgresql.Driver"
}

# Read data from the database into a DataFrame
df = spark.read.jdbc(url=url, table=table, properties=properties)

# Display the DataFrame schema
df.printSchema()

# Display the first few rows of the DataFrame
df.show(5)