# Advanced Data Preparation with Best Practices
---
### Prerequisites:
- Basic to intermediate Python knowledge.
- Familiarity with Pandas or similar libraries.
- Understanding of basic machine learning concepts is a plus but not required.



---
**Story of the day**


![img.jpg](img.jpg)

- Doubts like investigator
- Follow his intuition

## Data ingestion:
is an important part of the data life cycle. It’s about collecting and importing data from different sources into a storage system where it can be accessed and analyzed.

The goal is to make the data: high quality, accurate, and available for use.

so Data ingestion = importing + processing + cleaning + transforming data.

## Ways of Ingestion:-
### 1- Batch Ingestion:

Batch ingestion is a way to (collect, process, and load large amounts of data into a system) at scheduled times., rather than in real-time. This approach helps organizations handle a lot of data efficiently by grouping it into batches and processing them together.

Benefits of Batch Ingestion

#### Why:
- efficient use of system resources,
- cost-effectiveness,
- simplified data management,
- robustness (how it work when things go wrong),
- scalability (can grow or handle more work).

Common Use Cases for Batch Ingestion

#### when?
- data analytics and reporting.
- Extract, Transform, and Load (ETL) processes.
- backups and archiving.
- performance reporting.

The Batch Ingestion Process

key steps: data extraction, data transformation, data loading, scheduling, and automation

The batch ingestion steps:

Data ingestion = importing + processing + cleaning + transforming data.

#### process:
- Data Extraction: Collect data
- Data Transformation: Clean, transform, and prepare the data for analysis.
- Data Loading: Load the processed data into a target system.
- scheduling and Automation: Use tools to automate the process and schedule it to run at regular intervals.

In [2]:
import time
import random

# Simulating a batch ingestion system
def generate_data():
    # Simulate incoming data
    return [{"id": i, "value": random.randint(1, 100)} for i in range(5)]

def process_batch(batch):
    print("Processing batch:")
    for record in batch:
        print(f"Processing record: {record}")
    print("Batch processed!\n")

# Batch ingestion logic
def batch_ingestion():
    for i in range(4):
        batch = generate_data()
        print("New batch received!")
        process_batch(batch)
        time.sleep(5)  # Wait before the next batch

# Run the batch ingestion
batch_ingestion()

New batch received!
Processing batch:
Processing record: {'id': 0, 'value': 11}
Processing record: {'id': 1, 'value': 37}
Processing record: {'id': 2, 'value': 31}
Processing record: {'id': 3, 'value': 15}
Processing record: {'id': 4, 'value': 75}
Batch processed!

New batch received!
Processing batch:
Processing record: {'id': 0, 'value': 78}
Processing record: {'id': 1, 'value': 41}
Processing record: {'id': 2, 'value': 100}
Processing record: {'id': 3, 'value': 19}
Processing record: {'id': 4, 'value': 10}
Batch processed!

New batch received!
Processing batch:
Processing record: {'id': 0, 'value': 100}
Processing record: {'id': 1, 'value': 91}
Processing record: {'id': 2, 'value': 81}
Processing record: {'id': 3, 'value': 20}
Processing record: {'id': 4, 'value': 65}
Batch processed!

New batch received!
Processing batch:
Processing record: {'id': 0, 'value': 75}
Processing record: {'id': 1, 'value': 22}
Processing record: {'id': 2, 'value': 36}
Processing record: {'id': 3, 'value'

### 2- Streaming ingestion:

(collect, process, and load data into a system) in real-time, 

Act on data immediately, such as: financial trading, fraud detection, sensor data monitoring.

#### Why:
- Real-time insights
- Continuous data processing
- Scalability
- Cost savings in cloud-based environments
- However, streaming ingestion also has some challenges: Implementing a streaming ingestion system can be complex Continuous processing requires constant computational resources Ensuring data consistency and accuracy can be difficult

#### When:
- Real-time fraud detection and security monitoring
- IoT and sensor data
- Online recommendations and personalization
- Financial market data
- Telecommunications
- Real-time logistics and supply chain management

#### process:
- Data extraction: Collecting data
- Data transformation: Cleaning, transforming, and preparing the data for analysis
- Data loading: Loading the processed data into a target storage system 
- (New on/ not scheduling) Monitoring and alerting: Ensuring the streaming ingestion process runs smoothly and consistently


In [3]:
import time
import random

# Simulating a streaming data source
def generate_streaming_data():
    for i in range(6):
        yield {"id": random.randint(1, 100), "value": random.randint(1, 100)}

def process_streaming_data(record):
    print(f"Processing record: {record}")
    time.sleep(1)  # Simulate processing time

# Streaming ingestion logic
def streaming_ingestion():
    stream = generate_streaming_data()
    for record in stream:
        process_streaming_data(record)

# Run the streaming ingestion
streaming_ingestion()

Processing record: {'id': 72, 'value': 44}
Processing record: {'id': 77, 'value': 28}
Processing record: {'id': 53, 'value': 33}
Processing record: {'id': 5, 'value': 12}
Processing record: {'id': 24, 'value': 86}
Processing record: {'id': 90, 'value': 77}


### 3- Semi-real-time ingestion: (also known as near real-time ingestion or micro-batch processing)

process data with minimal delay, typically in seconds or minutes, rather than instantly. 

This approach balances real-time and batch processing.

providing timely insights while reducing the resource intensity and complexity associated with true real-time systems.

#### example: 
- Real-time analytics, where data needs to be tracked live, such as stock prices or market trends Social media monitoring where companies track mentions and sentiments to manage brand reputation Customer experience enhancement, where support teams need up-to-date information on customer issues and behavior Operational dashboards, where near-immediate data processing is beneficial but not critically time-sensitive

#### why: 
- Timely insights without the complexity of real-time systems.
- Reduced resource intensity compared to real-time systems.
- Suitable for applications where near-immediate data processing is beneficial but **not critically** time-sensitive.

#### When: 
- Streaming analytics for real-time dashboards
- visualizations Social media 
- sentiment analysis for brand reputation
- management Customer experience enhancement for support teams and personalized marketing messages


In [4]:
import time
import random

"""
micro-batch
"""
# Simulating semi-real-time data generation
def generate_data():
    return [{"id": i, "value": random.randint(1, 100)} for i in range(3)]

def process_batch(batch):
    print("Processing semi-real-time batch:")
    for record in batch:
        print(f"Processed record: {record}")
    print("Batch processed!\n")

# Semi-real-time ingestion logic
def semi_real_time_ingestion():
    for i in range(4):
        batch = generate_data()
        print("New batch received!")
        process_batch(batch)
        time.sleep(1)  # Wait for the next batch

# Run semi-real-time ingestion
semi_real_time_ingestion()

New batch received!
Processing semi-real-time batch:
Processed record: {'id': 0, 'value': 55}
Processed record: {'id': 1, 'value': 65}
Processed record: {'id': 2, 'value': 83}
Batch processed!

New batch received!
Processing semi-real-time batch:
Processed record: {'id': 0, 'value': 17}
Processed record: {'id': 1, 'value': 59}
Processed record: {'id': 2, 'value': 67}
Batch processed!

New batch received!
Processing semi-real-time batch:
Processed record: {'id': 0, 'value': 24}
Processed record: {'id': 1, 'value': 34}
Processed record: {'id': 2, 'value': 95}
Batch processed!

New batch received!
Processing semi-real-time batch:
Processed record: {'id': 0, 'value': 68}
Processed record: {'id': 1, 'value': 10}
Processed record: {'id': 2, 'value': 25}
Batch processed!



### So, when to use semi-real-time vs real-time?

#### Use Real-Time Ingestion for:
Applications that require immediate insights and actions, such as:
- Fraud detection
- Stock trading
- Live monitoring systems
- Applications where latency is critical, and data needs to be processed instantly

#### Semi-Real-Time micro-batch:
near-immediate data processing is beneficial but **not critically** time-sensitive:
- Social media monitoring
- Customer feedback analysis
- Operational dashboards
- Real-time analytics (where data needs to be tracked live, but with some delay is acceptable)
- Applications where reducing resource intensity and complexity is important, but timely insights are still required

In general, if the application requires instant processing and action, choose real-time ingestion.

If the application can tolerate some delay (seconds or minutes) and still benefit from timely insights, semi-real-time ingestion may be a more suitable option.

## Data Source Solutions 
There are many types of data sources available for ingestion, including:
- traditional file formats,
- databases,
- APIs,
- message queues,
- streaming services.


### Event Data Processing Solution

Event data processing involves ingesting, processing, and responding in **real-time** . 

#### Apache Kafka: 
open-source event streaming platform that can handle **large amounts of data** in **real-time**. 

<img src="attachment:868fa556-d32b-4a59-bb38-5ffdf8de71a0.png" width="400"/>


Kafka components, including:

- Ingestion: Data streams can be ingested into Kafka using Kafka producers.
- Processing: Kafka can process streams of data using Kafka Streams.
- Storage: Kafka stores data streams in distributed, fault-tolerant clusters.
- Consumption: Data streams can be consumed from Kafka using Kafka consumers.

![image.png](attachment:4d2ecb50-786b-4aab-b99c-e4e38f9ca989.png)



#### Kafka Libraries for Python

- Kafka-Python: A pure Python implementation of Kafka's protocol, offering a simple and easy-to-use interface.
- Confluent-kafka-python: A high-performance library developed and maintained by Confluent, ideal for production environments.
Databases

### Two main types of databases:

- Classic databases (relational databases): fixed schema, ideal for applications that require **complex querying** and transactional consistency.
- NoSQL databases: Store data in a flexible schema format, ideal for applications that require (high-performance for reads/writes.) and **scalability**, such as real-time analytics and e-commerce platforms.
Ingesting Data from Databases


### Here's a brief overview of some popular libraries and when to use them:

- sqlite3: medium-sized applications, local storage, and prototyping. Avoid using for large-scale applications or those requiring high concurrency or advanced security features.
- SQLAlchemy: Suitable for large-scale production environments that need flexibility, scalability, and support for multiple database engines. Avoid using for small, lightweight applications where the overhead of its comprehensive ORM capabilities is unnecessary.
- Psycopg2: The go-to choice for interacting with PostgreSQL databases. Ideal for production environments requiring reliability and efficiency in handling PostgreSQL databases.
- MySQL Connector/Python: Great for applications that need to interact directly with MySQL databases. Suitable for environments where compatibility and official support from Oracle are critical. Do not use if your application requires compatibility with multiple database systems or a higher-level abstraction.

When to Choose Each Library

Use sqlite3 for small, lightweight applications or prototyping. Use SQLAlchemy for large-scale production environments that need flexibility and scalability. Use Psycopg2 for applications that interact with PostgreSQL databases. Use MySQL Connector/Python for applications that interact with MySQL databases and require compatibility and official support from Oracle. NoSQL Databases

### Data Ingestion from NoSQL Databases

NoSQL databases offer flexibility, scalability, and performance for handling unstructured or semi-structured data. We will explore the key Python libraries for working with popular NoSQL databases and examine when and how to use them effectively.

NoSQL databases are ideal for storing and processing large volumes of semi-structured or unstructured data in batch operations. They are particularly effective when:

- The schema can evolve
- Handling diverse data types in a unified way

#### NoSQL libraries: 
- pymongo: interacting with MongoDB, It supports CRUD operations, index management, and complex queries, making it user-friendly and suitable for various applications.
- cassandra-driver: is a Python library for Apache Cassandra, a scalable NoSQL database optimized for large, distributed datasets, ideal for real-time analytics and high-availability applications.


Common use cases for NoSQL databases include:
- IoT devices
- Logs
- Social media feeds

### See you with next lecture with improving Data Quality