## Python for Data Engineering

Python plays a crucial role in the world of data engineering, offering versatile and powerful libraries. It has been adopted in various domains, including data science, machine learning, AI, data visualization, and data engineering. Python is widely used in big data processing through frameworks like Apache Spark, workflow orchestration, web scraping, and more. In this post, I’ll present the useful elements of this language and its use cases for data engineering.

A little walkthrough with some built-in data structures that python provides. 
- list
- tuple
- dictionary
- set

### list

Mutable: Lists are mutable, meaning you can modify their elements after the list is created. You can add, remove, or modify elements.

In [1]:
my_list = [1, 2, 3, 'a', 'b', 'c']
my_list.append(4)     # Adds 4 to the end of the list
my_list.remove('a')  # Removes the element 'a'

In [2]:
print(my_list)

[1, 2, 3, 'b', 'c', 4]


### List Operations
**Some useful operations you can use with lists:**
- list comprehension
- filer
- map
- reduce

#### list comprehension
List comprehension offers a shorter syntax when you want to create a new list based on the values of an existing list. This allows you to simplify code.

In [9]:
cubes = []

# normal syntax
for i in range(10):
    cubes.append(i**3)
print(cubes)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]


In [10]:
# with list comprehension

cubes = [i**3 for i in range(10)]
print(cubes)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]


#### Filter
Filter as the name suggests helps to filter your list using filter conditions.

In [11]:
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]

# Filter even numbers
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
# Output: [2, 4, 6, 8]

In [12]:
print(even_numbers)

[2, 4, 6, 8]


#### Lambda Function
We're taking a quick detour from our course as we've encountered anonymous function(lambda function).

```
lambda x: x % 2 == 0, numbers)
```
This is an anonymous function (lambda function) that takes an input `x` and returns `True` if `x` is even `(x % 2 == 0)`, otherwise `False`.

The `filter` function works hand in hand here taking the lambda function as input and applies it to each element in `numbers`. 
** It keeps only the elements for which the lambda function returns True (i.e., even numbers).

In [13]:
# we could use a regular function instead of a lamdba function

def is_even(x):
    return x%2 == 0

In [14]:
print(is_even(10))

True


In [22]:
result = filter(is_even, numbers) # filter() returns a filter object(an iterator)

N.B: **By definition, an iterator can be iterated through once, producing all values, and is then exhausted.**

In [23]:
result

<filter at 0x7939b814d5a0>

In [24]:
for i in result:
    print(i)

2
4
6
8


In [25]:
# or we can use list() to convert it into a list
even_num = list(filter(is_even, numbers))
print(even_num)

[2, 4, 6, 8]


#### Map
The Map function allows us to use a function on a list of elements.

In [26]:
def power2(val: int) -> int:
    return val*val

numbers = [1, 2, 3, 4, 5]

power_numbers = list(map(power2, numbers))
# OR
squared_numbers = list(map(lambda x: x**2, numbers))

# Output: [1, 4, 9, 16, 25]

In [27]:
print(numbers)

[1, 2, 3, 4, 5]


In [28]:
print(squared_numbers)

[1, 4, 9, 16, 25]


#### reduce

The `reduce()` function is a powerful tool in Python that operates on a list (or any iterable), applies a function to its elements, and ‘reduces’ them to a single output.

In [39]:
from functools import reduce

words = ["apple", "banana", "orange", "apple", "grape", "banana"]

# Count the occurrences of each word
word_counts = reduce(lambda counts, word: {**counts, word: counts.get(word, 0) + 1}, words, {})

print(word_counts)
# Output: {'apple': 2, 'banana': 2, 'orange': 1, 'grape': 1}


numbers = [1, 2, 3, 4, 5]
product = reduce((lambda x, y: x * y), numbers)
print(product)

# Output: 120

{'apple': 2, 'banana': 2, 'orange': 1, 'grape': 1}
120


### Dictionary
Dictionaries are used to store data values in key-value pairs. A dictionary is a collection that is changeable and does not allow duplicates in the keys.

In [4]:
thisdict = {
  "brand": "Ford",
  "model": "Mustang",
  "year": 1964
}

In [5]:
print(thisdict)

{'brand': 'Ford', 'model': 'Mustang', 'year': 1964}


### Tuple
Immutable: Tuples are immutable, meaning once they are created, their elements cannot be changed or modified. Unlike lists, you can’t add or remove elements from a tuple.

In [1]:
my_tuple = (1, 2, 3, 'a', 'b', 'c')

### Set
Unordered and Unique Elements: Sets are unordered collections of unique elements. They do not allow duplicate values, and the order of elements is not guaranteed.

Syntax: Defined using curly braces {} or by using the set() constructor.

In [7]:
my_set = {1, 2, 3}
my_set.add(1)
my_set.add(1) # won't add 1 to the set
print(my_set)

{1, 2, 3}


### Generators
Python provides a generator to create your iterator function. A generator is a special type of function that does not return a single value, instead, it returns an iterator object with a sequence of values. In a generator function, a yield keyword is used instead of a return. It allows you to create an iterator without loading the entire dataset into memory, making it suitable for processing huge files.

Benefits of generators:
- Memory efficient: Only stores the current value, not the entire list.
- Faster Execution: Generates values on demand, reducing overhead.
- Lazy Evaluation: Useful for large datasets or infinite sequences.
- Better Performance: Avoids creating large lists in memory.

When to use generators?
- when dealing with large datasets (ex. reading big files line by line)
- when you don't need to store all values at once 
- when you need lazy evaluation. (streaming data)

In [42]:
text = """
transaction_id,user\r
1,aaa\r
\r
2,xx\r
3,ccc\r
\r
"""

In [44]:
print(text) # this is how it looks


transaction_id,user
1,aaa

2,xx
3,ccc




In [49]:
text.split("\r")[0].strip().upper() # e.g. of a processed line

'TRANSACTION_ID,USER'

In [63]:
# creating a generator function
def process_large_files(text):
    for line in text.split("\r"):
        processed_line = line.strip().upper()
        # print("begin processing")
        if processed_line != "":
            # Yield the processed line
            yield processed_line

In [64]:
for processed_line in process_large_files(text):
    print(processed_line)

TRANSACTION_ID,USER
1,AAA
2,XX
3,CCC


### Enumerate
The enumerate function adds a counter to an iterable and returns it.

In [41]:
# normal way
lista = ['a','b','c','d','e']
count = 0
for l in lista:
    print('Index:', count,' Value:', l)
    count+=1
print('')
# enumerate way
for count, l in enumerate(lista):
    print('Index:', count,' Value:', l)

Index: 0  Value: a
Index: 1  Value: b
Index: 2  Value: c
Index: 3  Value: d
Index: 4  Value: e

Index: 0  Value: a
Index: 1  Value: b
Index: 2  Value: c
Index: 3  Value: d
Index: 4  Value: e


### Decorators:
Python Decorators are used to apply additional functionality to objects. They are used to provide more functionality without having to write additional code inside the object. For example, we can modify the returned value and display it from the decorator function

In [65]:
def make_upper(function):
    def upper():
        f = function()
        print(f"this is from origin value: {f}")
        return f.upper()
    return upper

In [66]:
def helloworld():
    return "hello world"

In [67]:
helloworld()

'hello world'

In [68]:
@make_upper
def helloworld():
    return "hello world"

In [69]:
helloworld()

this is from origin value: hello world


'HELLO WORLD'

In [6]:
import time

def retry(times, wait):

    def decorator(func):
        def newfn(*args, **kwargs):
            attempt = 0
            while attempt < times:
                try:
                    time.sleep(wait)
                    return func(*args, **kwargs)
                except Exception as e:
                    print(
                        'Exception thrown when attempting to run %s, attempt '
                        '%d of %d' % (func, attempt, times)
                    )
                    attempt += 1
            time.sleep(wait)
            return func(*args, **kwargs)
        return newfn
    return decorator

@retry(times=3, wait=2)
def get_from_rest():
    print('Try read data from rest API')

    raise ConnectionError ('Lack of connection')

get_from_rest()

Try read data from rest API
Exception thrown when attempting to run <function get_from_rest at 0x72bfb3b15b40>, attempt 0 of 3
Try read data from rest API
Exception thrown when attempting to run <function get_from_rest at 0x72bfb3b15b40>, attempt 1 of 3
Try read data from rest API
Exception thrown when attempting to run <function get_from_rest at 0x72bfb3b15b40>, attempt 2 of 3
Try read data from rest API


ConnectionError: Lack of connection

#### An easy way to understand what is a decorator and how you use it.

To my knowledge, a decorator is a function that changes another function's behaviour without having to change the base function's underlying structure or code.

It reduces code duplication, make your code more reusable, readably and cleaner.

Decorators are used heavily in frameworks like flask, django and fast api.

In [7]:
def get_biriyani(): # a function that gets you biriyani.
    print("Here's your biriyani")

In [9]:
get_biriyani() # voila!

Here's your biriyani


In [26]:
# now we wanna add some flavours to our biriyani like do we want chicken biriyani or beef biriyani 
# or even better a kacchi biriyani!!!

# but we would not want to change our base function because lets face it, not everyone has the same taste and may choose their own version
# of biriyani and thats completely okay!

# so to avoid changing our base function code and achieve our goal where everyone would get the biriyani of their own choosing
# we use decorators here

def add_flavour(func): # this function accepts a function as a parameter
    def wrapper(*args, **kwargs):
        print("Adding your flavour!")
        func(*args, **kwargs)
    return wrapper

@add_flavour
def get_biriyani(flavour): # a function that gets you biriyani.
    print(f"Here's your {flavour} biriyani")

In [27]:
get_biriyani("chicken")

Adding your flavour!
Here's your chicken biriyani


### Data Class
A data class in Python is a specially structured class that is optimized for the storage and representation of data. Data classes have certain built-in functions to take care of the representation of data as well as its storage.

Data class takes care of things like displaying values, and object comparison. We don’t need to use a constructor to assign values. You don’t need to implement __repr__, __eq__, or __hash__ for debugging and object comparison.

In [28]:
from dataclasses import dataclass

@dataclass #dataclass decorator
class customerD:
    name: str
    id: int
    surname: str

class customer:
    def __init__(self, name, aid, books):
        self.name = name
        self.aid = aid
        self.surname = books

In [30]:
obj1 = customer("Mahmud", 1234, "Humayun")
obj2 = customerD("Hasan", 2341, "Ahmed")

In [31]:
obj1

<__main__.customer at 0x72bfbcd4cf70>

In [32]:
obj2

customerD(name='Hasan', id=2341, surname='Ahmed')

In [33]:
obj3 = customerD("Hasan", 2341, "Ahmed")

In [35]:
obj2 == obj3 # built in equality check

True

Easy way to convert to dictionary or tuple. It simplifies the code when we need to operate on this format and save data in JSON output.

In [37]:
from dataclasses import dataclass, astuple, asdict
import json

@dataclass #dataclass decorator
class customer:
 name: str #Type Hints
 id: int
 surname: str


Obj1 = customer("Erick",1254,"Nowak")

print(astuple(Obj1))
print(asdict(Obj1))

# Output:
# ('Erick', 1254, 'Nowak')
# {'name': 'Erick', 'id': 1254, 'surname': 'Nowak'}

json.dumps(asdict(Obj1))

('Erick', 1254, 'Nowak')
{'name': 'Erick', 'id': 1254, 'surname': 'Nowak'}


'{"name": "Erick", "id": 1254, "surname": "Nowak"}'

### Concurrency vs Parallelism
Concurrency and parallelism are names for two different mechanisms for task execution in a script. For a data engineer, these techniques will help to speed up the process of retrieving data from API or transforming data.

#### Multithreading Pools
Multithreading is a way to achieve parallelism by executing multiple threads of code. A thread is a lightweight process that shares the same memory space as the parent process. To use multi-threading pools in Python, you can use the `ThreadPoolExecutor` class from the `concurrent.futures` module. Using threads we can parallel calls to API to retrieve data.

In [38]:
import calendar
from concurrent.futures import ThreadPoolExecutor
import requests

def generate_dates(year, month):
    _, last_day = calendar.monthrange(year, month)

    dates = [f"{year}-{month:02d}-{day:02d}" for day in range(1, last_day + 1)]
    return dates

year = 2023
month = 10
result = generate_dates(year, month)
urls = []
for x in result:
    urls.append(f"http://api.nbp.pl/api/exchangerates/rates/a/gbp/{x}/")

def download_page(url):
    response = requests.get(url)
    return response.content

with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(download_page, urls))

for result in results:
    print(result)

b'404 NotFound - Not Found - Brak danych'
b'{"table":"A","currency":"funt szterling","code":"GBP","rates":[{"no":"190/A/NBP/2023","effectiveDate":"2023-10-02","mid":5.3183}]}'
b'{"table":"A","currency":"funt szterling","code":"GBP","rates":[{"no":"191/A/NBP/2023","effectiveDate":"2023-10-03","mid":5.3195}]}'
b'{"table":"A","currency":"funt szterling","code":"GBP","rates":[{"no":"192/A/NBP/2023","effectiveDate":"2023-10-04","mid":5.3492}]}'
b'{"table":"A","currency":"funt szterling","code":"GBP","rates":[{"no":"193/A/NBP/2023","effectiveDate":"2023-10-05","mid":5.3057}]}'
b'{"table":"A","currency":"funt szterling","code":"GBP","rates":[{"no":"194/A/NBP/2023","effectiveDate":"2023-10-06","mid":5.3195}]}'
b'404 NotFound - Not Found - Brak danych'
b'404 NotFound - Not Found - Brak danych'
b'{"table":"A","currency":"funt szterling","code":"GBP","rates":[{"no":"195/A/NBP/2023","effectiveDate":"2023-10-09","mid":5.2897}]}'
b'{"table":"A","currency":"funt szterling","code":"GBP","rates":[{"no"

### Asynchronous Programming

A note about concurrency and parallelism:

True parallelism means an application runs multiple tasks at the same time where each task runs on a separate processing unit.

Concurrency means that an application is making progress on more than one task at the same time but may switch between tasks instead of actually running them in parallel. 

This example from stackoverflow illustrates perfectly:
**Concurrency is two lines of customers ordering from a single cashier (lines take turns ordering); Parallelism is two lines of customers ordering from two cashiers (each line gets its own cashier)**.

Here in computer language: each customer is one task and each cashier is one processor and processor needs to take care of each customer/task concurrently or parallely. 

In [40]:
import asyncio
from random import randint
from time import perf_counter
from typing import Any, Awaitable

from req_http import http_get, http_get_sync

# The highest Pokemon id
MAX_POKEMON = 898

def get_random_pokemon_name_sync() -> str:
    pokemon_id = randint(1, MAX_POKEMON)
    pokemon_url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}"
    pokemon = http_get_sync(pokemon_url)
    return str(pokemon["name"])


async def get_random_pokemon_name() -> str:
    pokemon_id = randint(1, MAX_POKEMON)
    pokemon_url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}"
    pokemon = await http_get(pokemon_url)
    return str(pokemon["name"])


async def main() -> None:

    # synchronous call
    time_before = perf_counter()
    for _ in range(20):
        get_random_pokemon_name_sync()
    print(f"Total time (synchronous): {perf_counter() - time_before}")

    # asynchronous call
    time_before = perf_counter()
    await asyncio.gather(*[get_random_pokemon_name() for _ in range(20)])
    print(f"Total time (asynchronous): {perf_counter() - time_before}")


asyncio.run(main())


ImportError: cannot import name 'http_get' from 'req_http' (/home/mahmudhasan/Work/problem_solving/LeetCode-Supremacy/pythonforDE/req_http.py)