In [1]:
import pandas as pd
import great_expectations as gx
from zipfile import ZipFile
import requests
from pathlib import Path
import json

In [2]:
!pip3 install duckdb -q -U


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [None]:
# !curl -Ls "https://info.stackoverflowsolutions.com/rs/719-EMH-566/images/stack-overflow-developer-survey-2022.zip" -o stack-overflow-developer-survey-2022.zip
# https://survey.stackoverflow.co/datasets/stack-overflow-developer-survey-2017.zip

In [8]:
from functools import wraps
from time import time

def timing(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time()
        result = f(*args, **kw)
        te = time()
        # print('Function :%r with args:[%r, %r] took: %2.4f sec' % (f.__name__, args, kw, te-ts))
        print('Function :%r with args:[ %r] took: %2.4f sec' % (f.__name__, kw, te-ts))
        return result
    return wrap

@timing
def extract(year):
    print(f'Extractiing data for the {year} year.. ')
    resp = requests.get(f"https://survey.stackoverflow.co/datasets/stack-overflow-developer-survey-{year}.zip")
    data_dir = Path(f'./stack_data/{year}')
    data_dir.mkdir(exist_ok=True, parents=True)
    with open(data_dir / f'stack-overflow-developer-survey-{year}.zip', 'wb') as f:
        f.write(resp.content)
    return data_dir / f'stack-overflow-developer-survey-{year}.zip'

@timing
def transform(file_path: Path):
    print(f'Unzipping data for stackoverflow survey at {file_path}.. ')
    zip = ZipFile(file_path)
    for file in zip.infolist():
        if  Path(file.filename).suffix == '.csv':
            zip.extract(file.filename, path=file_path.parent)
            return Path(file_path.parent / file.filename )
@timing
def dq_checks(file_path: Path, validation_config_filepath: str = 'stackoverflow_survey_data_expectations_suite.json' ):
    print(f'Performing data quality checks using validation config from {validation_config_filepath}..')
    data = gx.read_csv(file_path,  encoding='latin-1')
    gx_data = gx.from_pandas(data)
    results = gx_data.validate(expectation_suite=validation_config_filepath)

    # save results for inspection 
    with open( file_path.parent / f'{file_path.stem}_dq_results.json', 'w') as f:
        json.dump(results.to_json_dict(), fp=f, indent=4)

    return results

@timing
def summerise_expectation_results(exp_results: dict):
    print('DQ result summary..')
    for result in exp_results.get('results'):
        print(f"\t * Success on {result['expectation_config']['expectation_type']} : {result['success']}")

def load(file_path: Path, year: int, db_path: Path = Path('.')):
    try:
        import duckdb
        db_path = str((db_path / 'stack_data_db.db').resolve())
        print(f"Loading data for the year {year} from {file_path} at {db_path}..")
        db_client = duckdb.connect(database = db_path)
        db_client.execute(f"create or replace table stack_{year} as select * from read_csv_auto('{str(file_path.absolute())}')")
        db_client.commit()
        db_client.close()
    except BaseException as e:
        print(f'Error occured while loading data..{str(e)}')

@timing
def orchestrate(start_year, end_year):
    print(f'Starting stackoverflow survey ETL from {start_year} till {end_year}..\n', end='')
    for year in range(start_year, end_year + 1):
        file_path = extract(year)
        data_file_path = transform(file_path)
        results = dq_checks(data_file_path)
        print(f'DQ results : {results.get("success")}')
        summerise_expectation_results(results)

        # skipping dq checks
        # if results.get("success"):
        #     load(data_file_path, year)
        load(data_file_path, year)
        print('\n')

if __name__ == '__main__':
    # 2011 to 2021 after that url changed
    orchestrate(2017, 2021)

Starting stackoverflow survey ETL from 2017 till 2021..
Extractiing data for the 2017 year.. 
Function :'extract' with args:[ {}] took: 0.2408 sec
Unzipping data for stackoverflow survey at stack_data/2017/stack-overflow-developer-survey-2017.zip.. 
Function :'transform' with args:[ {}] took: 1.0599 sec
Performing data quality checks using validation config from stackoverflow_survey_data_expectations_suite.json..
Function :'dq_checks' with args:[ {}] took: 1.8880 sec
DQ results : True
DQ result summary..
	 * Success on expect_table_columns_to_match_set : True
	 * Success on expect_table_column_count_to_equal : True
Function :'summerise_expectation_results' with args:[ {}] took: 0.0000 sec
Loading data for the year 2017 from stack_data/2017/survey_results_public.csv at /workspaces/stack/airflow/data/stack_data_db.db..


Extractiing data for the 2018 year.. 
Function :'extract' with args:[ {}] took: 0.2279 sec
Unzipping data for stackoverflow survey at stack_data/2018/stack-overflow-deve

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))



Extractiing data for the 2019 year.. 
Function :'extract' with args:[ {}] took: 0.2411 sec
Unzipping data for stackoverflow survey at stack_data/2019/stack-overflow-developer-survey-2019.zip.. 
Function :'transform' with args:[ {}] took: 1.6262 sec
Performing data quality checks using validation config from stackoverflow_survey_data_expectations_suite.json..
Function :'dq_checks' with args:[ {}] took: 3.3102 sec
DQ results : False
DQ result summary..
	 * Success on expect_table_columns_to_match_set : False
	 * Success on expect_table_column_count_to_equal : False
Function :'summerise_expectation_results' with args:[ {}] took: 0.0000 sec
Loading data for the year 2019 from stack_data/2019/survey_results_public.csv at /workspaces/stack/airflow/data/stack_data_db.db..


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))



Extractiing data for the 2020 year.. 
Function :'extract' with args:[ {}] took: 2.5795 sec
Unzipping data for stackoverflow survey at stack_data/2020/stack-overflow-developer-survey-2020.zip.. 
Function :'transform' with args:[ {}] took: 0.6980 sec
Performing data quality checks using validation config from stackoverflow_survey_data_expectations_suite.json..
Function :'dq_checks' with args:[ {}] took: 1.4432 sec
DQ results : False
DQ result summary..
	 * Success on expect_table_columns_to_match_set : False
	 * Success on expect_table_column_count_to_equal : False
Function :'summerise_expectation_results' with args:[ {}] took: 0.0000 sec
Loading data for the year 2020 from stack_data/2020/survey_results_public.csv at /workspaces/stack/airflow/data/stack_data_db.db..


Extractiing data for the 2021 year.. 
Function :'extract' with args:[ {}] took: 2.2670 sec
Unzipping data for stackoverflow survey at stack_data/2021/stack-overflow-developer-survey-2021.zip.. 
Function :'transform' with

In [11]:
import duckdb
db_client = duckdb.connect(database='/workspaces/stack/airflow/data/stack_data_db.db',)
file = '/workspaces/stack/airflow/data/stack_data/2017/survey_results_public.csv'
# db_client.execute(f"create or replace table stack_2017 as select * from read_csv_auto('{file}')")
db_client.sql('select * from stack_2020 limit 10;')
db_client.commit()
db_client.close()


In [None]:
# gx expectation experiments
def dq_checks(file_path):
    data = pd.read_csv(file_path, encoding='latin-1')
    print(data.columns.to_list())
    gx_data = gx.from_pandas(data)
    notes = {
        "notes": {
           "content": ["Based on our domain expertise, these columns should exist always" ],
           "format": "markdown",
           "source": "https://survey.stackoverflow.co/"
        }
    }

    result = gx_data.expect_table_columns_to_match_set(['Respondent', 'Professional', 'ProgramHobby', 'Country', 'University', 'EmploymentStatus', 'FormalEducation', 'MajorUndergrad', 'HomeRemote', 'CompanySize', 'CompanyType', 'YearsProgram', 'YearsCodedJob', 'YearsCodedJobPast', 'DeveloperType', 'WebDeveloperType', 'MobileDeveloperType', 'NonDeveloperType', 'CareerSatisfaction', 'JobSatisfaction', 'ExCoderReturn', 'ExCoderNotForMe', 'ExCoderBalance', 'ExCoder10Years', 'ExCoderBelonged', 'ExCoderSkills', 'ExCoderWillNotCode', 'ExCoderActive', 'PronounceGIF', 'ProblemSolving', 'BuildingThings', 'LearningNewTech', 'BoringDetails', 'JobSecurity', 'DiversityImportant', 'AnnoyingUI', 'FriendsDevelopers', 'RightWrongWay', 'UnderstandComputers', 'SeriousWork', 'InvestTimeTools', 'WorkPayCare', 'KinshipDevelopers', 'ChallengeMyself', 'CompetePeers', 'ChangeWorld', 'JobSeekingStatus', 'HoursPerWeek', 'LastNewJob', 'AssessJobIndustry', 'AssessJobRole', 'AssessJobExp', 'AssessJobDept', 'AssessJobTech', 'AssessJobProjects', 'AssessJobCompensation', 'AssessJobOffice', 'AssessJobCommute', 'AssessJobRemote', 'AssessJobLeaders', 'AssessJobProfDevel', 'AssessJobDiversity', 'AssessJobProduct', 'AssessJobFinances', 'ImportantBenefits', 'ClickyKeys', 'JobProfile', 'ResumePrompted', 'LearnedHiring', 'ImportantHiringAlgorithms', 'ImportantHiringTechExp', 'ImportantHiringCommunication', 'ImportantHiringOpenSource', 'ImportantHiringPMExp', 'ImportantHiringCompanies', 'ImportantHiringTitles', 'ImportantHiringEducation', 'ImportantHiringRep', 'ImportantHiringGettingThingsDone', 'Currency', 'Overpaid', 'TabsSpaces', 'EducationImportant', 'EducationTypes', 'SelfTaughtTypes', 'TimeAfterBootcamp', 'CousinEducation', 'WorkStart', 'HaveWorkedLanguage', 'WantWorkLanguage', 'HaveWorkedFramework', 'WantWorkFramework', 'HaveWorkedDatabase', 'WantWorkDatabase', 'HaveWorkedPlatform', 'WantWorkPlatform', 'IDE', 'AuditoryEnvironment', 'Methodology', 'VersionControl', 'CheckInCode', 'ShipIt', 'OtherPeoplesCode', 'ProjectManagement', 'EnjoyDebugging', 'InTheZone', 'DifficultCommunication', 'CollaborateRemote', 'MetricAssess', 'EquipmentSatisfiedMonitors', 'EquipmentSatisfiedCPU', 'EquipmentSatisfiedRAM', 'EquipmentSatisfiedStorage', 'EquipmentSatisfiedRW', 'InfluenceInternet', 'InfluenceWorkstation', 'InfluenceHardware', 'InfluenceServers', 'InfluenceTechStack', 'InfluenceDeptTech', 'InfluenceVizTools', 'InfluenceDatabase', 'InfluenceCloud', 'InfluenceConsultants', 'InfluenceRecruitment', 'InfluenceCommunication', 'StackOverflowDescribes', 'StackOverflowSatisfaction', 'StackOverflowDevices', 'StackOverflowFoundAnswer', 'StackOverflowCopiedCode', 'StackOverflowJobListing', 'StackOverflowCompanyPage', 'StackOverflowJobSearch', 'StackOverflowNewQuestion', 'StackOverflowAnswer', 'StackOverflowMetaChat', 'StackOverflowAdsRelevant', 'StackOverflowAdsDistracting', 'StackOverflowModeration', 'StackOverflowCommunity', 'StackOverflowHelpful', 'StackOverflowBetter', 'StackOverflowWhatDo', 'StackOverflowMakeMoney', 'Gender', 'HighestEducationParents', 'Race', 'SurveyLong', 'QuestionsInteresting', 'QuestionsConfusing', 'InterestedAnswers', 'Salary', 'ExpectedSalary']
, meta=notes)
    gx_data.expect_table_column_count_to_equal(value=154)
    print(result)
    expectation_suite = gx_data.get_expectation_suite(discard_failed_expectations=False)
    
    import json
    with open("stackoverflow_survey_data_expectations_suite.json", "w") as my_file:
        my_file.write(
            json.dumps(expectation_suite.to_json_dict(), sort_keys=True, indent=4)
        )

dq_checks('/workspaces/stack/airflow/data/stack_data/2011/2011 Stack Overflow Survey Results.csv')

['What Country or Region do you live in?', 'Which US State or Territory do you live in?', 'How old are you?', 'How many years of IT/Programming experience do you have?', 'How would you best describe the industry you work in?', 'Which best describes the size of your company?', 'Which of the following best describes your occupation?', 'How likely is it that a recommendation you make will be acted upon?', 'What is your involvement in purchasing? You can choose more than 1.', 'Unnamed: 9', 'Unnamed: 10', 'Unnamed: 11', 'Unnamed: 12', 'Unnamed: 13', 'Unnamed: 14', 'What types of purchases are you involved in?', 'Unnamed: 16', 'Unnamed: 17', 'Unnamed: 18', 'Unnamed: 19', 'Unnamed: 20', 'What is your budget for outside expenditures (hardware, software, consulting, etc) for 2011?', 'Unnamed: 22', 'Unnamed: 23', 'Unnamed: 24', 'Unnamed: 25', 'Unnamed: 26', 'Unnamed: 27', 'Unnamed: 28', 'What type of project are you developing?', 'Which languages are you proficient in?', 'Unnamed: 31', 'Unnamed:

Next steps:

- Build cli scripts for ETL and DQ
- Pull and store in datalake(s3, minio, opendata formats icebergs, file formats clickhouse)
- load into dataware house postgres 
- use spark/dbt for tranformation
- Visualize it (Grafana)
- Setup infra using terraform
- orchestrate using airflow/dagster

bonus : 
    Add llm layer
    fastapi endpoints
    observability
    
