In [1]:
import pymongo
import pandas as pd
import math

import dask.dataframe as dd ## 대규모 데이터 처리
# import vaex ## 대규모 데이터 처리

import os
from dotenv import load_dotenv

In [2]:
# .env 파일에서 환경 변수 로드
load_dotenv('C:/py_src/awake/env')

# 환경 변수에서 MongoDB 연결 정보 가져오기
mongo_password = os.getenv('MONGO_PASSWORD')
mongo_user = os.getenv('MONGO_USER')

In [3]:
# MongoDB 연결 URL
url = f"mongodb+srv://{mongo_user}:{mongo_password}@meercat-external.udyfs.mongodb.net/?retryWrites=true&w=majority&appName=meercat-external"
client = pymongo.MongoClient(url, serverSelectionTimeoutMS=100000)

In [4]:
# 연결 확인
from pymongo.errors import ServerSelectionTimeoutError

try:
    # 데이터베이스 목록을 가져오는 쿼리
    databases = client.list_database_names()
    print("Connected successfully. Databases:", databases)
    
except ServerSelectionTimeoutError as err:
    print("Connection failed:", err)

Connected successfully. Databases: ['Test', 'admin', 'config', 'local']


In [5]:
# 데이터베이스 확인
client.list_database_names()

['Test', 'admin', 'config', 'local']

In [6]:
# Test DB 선택
db = client.Test

# Test DB 컬렉션 확인
collections = db.list_collection_names()

In [7]:
len(collections)

21

In [8]:
# 단위 환산
def convert_bytes(num):
    for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
        if num < 1024.0:
            return f"{num:.2f} {x}"
        num /= 1024.0

In [9]:
for collection_name in collections:
    # 컬렉션 통계 정보 가져오기
    stats = db.command("collStats", collection_name)

    # 컬렉션의 크기와 문서 수 출력    
    print(f"Collection '{collection_name}' size: {convert_bytes(stats['size'])}")
    print(f"Collection '{collection_name}' document count: {stats['count']}")
    print(f"Total index size: {convert_bytes(stats['totalIndexSize'])}")
    print('--------------------------------------------------------------------')

Collection 'hashtag_hashtag' size: 13.39 MB
Collection 'hashtag_hashtag' document count: 3052
Total index size: 172.00 KB
--------------------------------------------------------------------
Collection 'youtube_revenue' size: 259.47 MB
Collection 'youtube_revenue' document count: 811499
Total index size: 46.44 MB
--------------------------------------------------------------------
Collection 'youtube_report_v2' size: 4.60 MB
Collection 'youtube_report_v2' document count: 5303
Total index size: 484.00 KB
--------------------------------------------------------------------


Collection 'influencer_datas' size: 7.78 GB
Collection 'influencer_datas' document count: 3340624
Total index size: 246.63 MB
--------------------------------------------------------------------
Collection 'youtube_channel_demographics' size: 59.67 MB
Collection 'youtube_channel_demographics' document count: 297683
Total index size: 4.69 MB
--------------------------------------------------------------------
Collection 'user_aggregations' size: 30.61 MB
Collection 'user_aggregations' document count: 31597
Total index size: 8.33 MB
--------------------------------------------------------------------
Collection 'campaign_bookmarks' size: 2.49 KB
Collection 'campaign_bookmarks' document count: 26
Total index size: 36.00 KB
--------------------------------------------------------------------
Collection 'youtube_videos' size: 8.33 GB
Collection 'youtube_videos' document count: 302054
Total index size: 124.73 MB
--------------------------------------------------------------------
Collection 

In [11]:
# 인스타 관련 데이터 제외 --> # 'youtube_videos', 'youtube_datas', 'youtube_daily_channel_basics', 'youtube_subscriber' 10% 샘플링
collections_need = [
    # 'hashtag_hashtag',
    'youtube_revenue',
    'youtube_report_v2',
    # 'influencer_datas',
    'youtube_channel_demographics',
    # 'user_aggregations',
    # 'campaign_bookmarks',
    'youtube_videos',
    # 'hashtag_hashtaglog',
    'youtube_report',
    'youtube_users',
    # 'alpha_tests',
    # 'influencer_media_datas',
    'youtube_channel_locations',
    'youtube_daily_channel_basics',
    # 'influencer_media_comments',
    # 'user_bookmarks',
    'youtube_subscriber',
    # 'campaigns',
    'youtube_datas',
    # 'instagram_stories'
    ]

In [12]:
# 샘플링 대상 테이블 리스트
tables_to_sample = ['youtube_videos', 'youtube_datas', 'youtube_daily_channel_basics', 'youtube_subscriber']

youtube_dict={}
for collection_name in collections_need:
    if collection_name in tables_to_sample:
        # 전체 데이터수 확인
        total_documents  = db[collection_name].count_documents({})

        # 샘플링할 문서 수 계산 (10%)
        page_number = 1
        sample_size = math.ceil(total_documents * 0.05)

        # 샘플링 수행
        sampled_documents = db[collection_name].find().skip((page_number - 1) * sample_size).limit(sample_size)

        # 샘플링된 문서를 리스트로 변환
        documents_list = list(sampled_documents)

        # MongoDB 문서를 DataFrame으로 변환
        youtube_dict[collection_name] = pd.DataFrame(documents_list)
        
        print(collection_name)
        print(youtube_dict[collection_name].columns)
        print(">> Success")
        print("--------------------------------------")
        print("")
        
    else:
        youtube_dict[collection_name] = pd.DataFrame(list(db[collection_name].find()))
        print(collection_name)
        print(youtube_dict[collection_name].columns)
        print(">> Success")
        print("--------------------------------------")
        print("")

youtube_revenue
Index(['_id', 'estimated_revenue', 'estimated_ad_revenue',
       'estimated_red_partner_revenue', 'gross_revenue', 'cpm',
       'ad_impressions', 'monetized_playbacks', 'playback_based_cpm',
       'youtube_user_id', 'data_created_at', '__v', 'created_at',
       'updated_at'],
      dtype='object')
>> Success
--------------------------------------

youtube_report_v2
Index(['_id', 'requested', 'youtube_user_id', 'content', 'phone_number',
       'template_code', 'created_at', 'updated_at', '__v'],
      dtype='object')
>> Success
--------------------------------------

youtube_channel_demographics
Index(['_id', 'youtube_user_id', 'end_date', 'created_at', 'updated_at', '__v',
       'demographics'],
      dtype='object')
>> Success
--------------------------------------

youtube_videos
Index(['_id', 'videos', 'youtube_user_id', 'end_date', 'created_at',
       'updated_at', '__v'],
      dtype='object')
>> Success
--------------------------------------

youtube_report

In [192]:
# for key_nm in collections_need:
#     youtube_dict[key_nm].to_csv('C:/py_src/awake/data/' + key_nm + '.csv', encoding='utf-8-sig', index=False)

In [81]:
# Define a function to extract the '_id' from 'youtubeInformation'
def extract_id(youtube_info):
    try:
        # Extract the '_id' from the dictionary
        return youtube_info.get('_id', '')
    except (AttributeError, TypeError):
        return ''

# Apply the extraction function to the 'youtubeInformation' column
youtube_dict['user_aggregations']['youtube_user_id'] = youtube_dict['user_aggregations']['youtubeInformation'].apply(extract_id)

In [13]:
# 키값 형식 맞추기
for key, df in youtube_dict.items():
    # 각 딕셔너리의 value가 DataFrame인지 확인
    if isinstance(df, pd.DataFrame):
        # '_id' 컬럼이 있는지 확인하고 문자열로 변환
        if '_id' in df.columns:
            df['_id'] = df['_id'].astype(str)
        
        # 'youtube_user_id' 컬럼이 있는지 확인하고 문자열로 변환
        if 'youtube_user_id' in df.columns:
            df['youtube_user_id'] = df['youtube_user_id'].astype(str)

        # 변경된 DataFrame을 다시 딕셔너리에 저장
        youtube_dict[key] = df

In [83]:
# 테이블별 다른 테이블과 키값 비교
results = {}

for key1, df1 in youtube_dict.items():
    if isinstance(df1, pd.DataFrame):
        results[key1] = {}
        
        if '_id' in df1.columns:
            id_list = df1['_id']
            
            for key2, df2 in youtube_dict.items():
                if key1 != key2 and isinstance(df2, pd.DataFrame) and '_id' in df2.columns:
                    common_ids = df2['_id'].isin(id_list).sum()
                    results[key1][f'_id_in_{key2}'] = common_ids
        
        if 'youtube_user_id' in df1.columns:
            user_id_list = df1['youtube_user_id']
            
            for key2, df2 in youtube_dict.items():
                if key1 != key2 and isinstance(df2, pd.DataFrame) and 'youtube_user_id' in df2.columns:
                    common_user_ids = df2['youtube_user_id'].isin(user_id_list).sum()
                    results[key1][f'youtube_user_id_in_{key2}'] = common_user_ids

# 결과 출력
for key, match_info in results.items():
    print(f"DataFrame '{key}'에서의 매칭 정보:")
    for match_key, match_count in match_info.items():
        print(f"  - {match_key}: {match_count}개")
    print()


DataFrame 'hashtag_hashtag'에서의 매칭 정보:
  - _id_in_youtube_revenue: 0개
  - _id_in_youtube_report_v2: 0개
  - _id_in_youtube_channel_demographics: 0개
  - _id_in_user_aggregations: 0개
  - _id_in_campaign_bookmarks: 0개
  - _id_in_hashtag_hashtaglog: 0개
  - _id_in_youtube_report: 0개
  - _id_in_youtube_users: 0개
  - _id_in_alpha_tests: 0개
  - _id_in_youtube_channel_locations: 0개
  - _id_in_user_bookmarks: 0개
  - _id_in_campaigns: 0개

DataFrame 'youtube_revenue'에서의 매칭 정보:
  - _id_in_hashtag_hashtag: 0개
  - _id_in_youtube_report_v2: 0개
  - _id_in_youtube_channel_demographics: 0개
  - _id_in_user_aggregations: 0개
  - _id_in_campaign_bookmarks: 0개
  - _id_in_hashtag_hashtaglog: 0개
  - _id_in_youtube_report: 0개
  - _id_in_youtube_users: 0개
  - _id_in_alpha_tests: 0개
  - _id_in_youtube_channel_locations: 0개
  - _id_in_user_bookmarks: 0개
  - _id_in_campaigns: 0개
  - youtube_user_id_in_youtube_report_v2: 1369개
  - youtube_user_id_in_youtube_channel_demographics: 85802개
  - youtube_user_id_in_user_aggre

In [19]:
youtube_dict.keys()

dict_keys(['youtube_revenue', 'youtube_report_v2', 'youtube_channel_demographics', 'youtube_videos', 'youtube_report', 'youtube_users', 'youtube_channel_locations', 'youtube_daily_channel_basics', 'youtube_subscriber', 'youtube_datas'])

In [54]:
dfs_to_merge = []
# youtube_dict의 각 데이터프레임을 리스트에 추가
for df in youtube_dict.values():
    if isinstance(df, pd.DataFrame):  # DataFrame인지 확인
        dfs_to_merge.append(df)

In [None]:
# 예제: 여러 개의 데이터프레임을 병합
# DataFrame들을 Dask DataFrame으로 변환
dask_df_list = [dd.from_pandas(df, npartitions=10) for df in dfs_to_merge]

# 첫 번째 DataFrame을 기준으로 병합을 시작
merged_df = dfs_to_merge[0]

# 나머지 DataFrame들과 순차적으로 병합
for df in dfs_to_merge[1:]:
    merged_df = dd.merge(merged_df, df, on='youtube_user_id', how='outer')

# 최종 병합된 DataFrame을 계산하여 Pandas DataFrame으로 변환
merged_df = merged_df.compute()

# 결과 출력
print(merged_df)


In [None]:
import pandas as pd
from functools import reduce

dfs_to_merge = []
# youtube_dict의 각 데이터프레임을 리스트에 추가
for df in youtube_dict.values():
    if isinstance(df, pd.DataFrame):  # DataFrame인지 확인
        dfs_to_merge.append(df)

# 데이터프레임 병합
merged_df = reduce(lambda left, right: pd.merge(left, right, on='youtube_user_id', how='inner'), dfs_to_merge)

# 결과 출력
print(merged_df)