<a href="https://colab.research.google.com/github/sylv0303/Git/blob/main/Multiprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
from typing import List
import pyodbc
import pandas as pd
from functools import partial
from multiprocessing import Pool
from pandas import DataFrame

In [6]:
!pip install pyodbc

Collecting pyodbc
  Downloading pyodbc-5.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (334 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/334.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━[0m [32m174.1/334.7 kB[0m [31m5.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m334.7/334.7 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyodbc
Successfully installed pyodbc-5.1.0


# Optimization and Multiprocessing
Creation of two functions using multiprocessing for code optimization. This technique enables more than 40 data extractions to be parallelized simultaneously, depending on the number of CPU cores available on the machine. This is particularly useful for Big Data management. By parallelizing queries, execution time has been considerably reduced. Two functions have been created: extract data and extract data chunk. On the one hand, the extract data chunk function extracts data from the database and takes as input an extraction date, a database connection, a list of columns to be extracted and a parameter to activate or deactivate the exposure filter. This function executes a SQL query to extract the corresponding data and returns the extracted data as a DataFrame.


In [23]:
def extract_data_chunk(extract_date: str, cnxn: pyodbc.Connection, cols_to_extract: List, expo_filter: bool = True) -> DataFrame:
    with cnxn:
        if expo_filter:
            query = f"""SELECT {','.join(cols_to_extract)}, SUM(expo)
                        FROM table_name
                        GROUP BY {','.join(cols_to_extract)};"""
        else:
            query = f"""SELECT {','.join(cols_to_extract)}, SUM(expo)
                        FROM table_name
                        WHERE (date_encours = '{extract_date}')
                        GROUP BY {','.join(cols_to_extract)};"""

        df = pd.read_sql(query, con=cnxn)

    return df

On the other hand, the extract data function uses multiprocessing to extract data from several dates in parallel. It takes the same input arguments and creates a partially applied extract data chunk function, parallelizing up to 40 SQL queries simultaneously. The results are then concatenated into a single DataFrame.

In [22]:
def extract_data(date_list: List, cnxn: pyodbc.Connection, cols_to_extract: List, expo_filter: bool) -> DataFrame:
    partial_extract = partial(extract_data_chunk, cnxn=cnxn, cols_to_extract=cols_to_extract, expo_filter=expo_filter)

    with Pool(40) as p:
        res = p.map(partial_extract, date_list)
        df = pd.concat(res).reset_index(drop=True)

    return df

## Example


In [None]:
def dra_D_C(date_debut: str, date_fin: str, expo_filter: bool):
  date_format = "%Y-%m-%d"
  date_start = datetime.strptime(date_debut, date_format)
  date_end = datetime.strptime(date_fin, date_format)

  date_list = []

  while date_start <= date_end:
    date_list.append(date_start.strftime(date_format))
    date_start += relativedelta(months=1)
    data = extract_data(
    date_list=date_list, cnxn=GRIDS_CONN, cols_to_extract=["date_encours", "easy_number", "dra_type", "score"], expo_filter=expo_filter)

  data["date_m1"] = data["date_encours"] + DateOffset(months=-1)

  filter_dra_d = data[data["dra_type"] == "D"]
  filter_dra_c = data[data["dra_type"] == "C"]

  result = filter_dra_d.merge(filter_dra_c, left_on=["easy_number", "date_encours"], right_on=["easy_number", "date_m1"], how="inner", suffixes=("_debut", "_fin"))

  result["score_fin"] = pd.to_numeric(result["score_fin"])
  result["score_debut"] = pd.to_numeric(result["score_debut"])
  result["score_diff"] = result["score_fin"] - result["score_debut"]
  out = (
  result.groupby(["score_debut", "score_diff"])
  .agg({"easy_number": "count", "expo": "sum"})
  ).reset_index()

return out
