# Most harmless user function of Python


In [2]:
from typing import List
import pandas as pd
def merge_df_list(
    df_left: pd.DataFrame, 
    dfs_right: List[pd.DataFrame], 
    keys: List[str], 
    methods: List[str]) -> pd.DataFrame:
    """
    功能：横向合并多个df
    参数：
    df_left: 最左边的df
    df_list：需要合并的df列表(除最左边的以外)
    keys：合并df所需要的key列表，需要与df_list一一对应, 
        列表元素为一个二元元组，元组元素为str列表
    methods: 合并df所需要的方法列表，需要与df_list一一对应

    返回值：合并后的df
    """
    # 将最左边的数据帧赋值给df_merged
    df_merged = df_left
    # 使用zip函数同时迭代df_right，keys和methods列表
    for df, key, method in zip(dfs_right, keys, methods):
        # 使用指定的键和方法合并当前数据帧与df_merged
        df_merged = df_merged.merge(df, left_on=key[0], 
                        right_on=key[1], how=method)
    # 返回合并后的数据帧
    return df_merged

# 示例代码：
if __name__ == "__main__":
    df1 = pd.DataFrame({"A": [1, 2, 3], "B": [1, 2, 6]})
    df2 = pd.DataFrame({"B": [1, 2, 9], "C": [10, 11, 12]})
    df3 = pd.DataFrame({"C": [10, 3, 15], "D": [16, 17, 18]})


    df_left = df1
    df_right = [df2, df3]
    keys = [("B","B"),("C","C")]
    methods = ['inner','outer']

    merged_df = merge_df_list(df_left,df_right, keys,methods)
    print(f"df1: \n {df1} \n")
    print(f"df1: \n {df2} \n")
    print(f"df1: \n {df3} \n")
    print(f"merged df: \n {merged_df} \n")


df1: 
    A  B
0  1  1
1  2  2
2  3  6 

df1: 
    B   C
0  1  10
1  2  11
2  9  12 

df1: 
     C   D
0  10  16
1   3  17
2  15  18 

merged df: 
      A    B   C     D
0  1.0  1.0  10  16.0
1  2.0  2.0  11   NaN
2  NaN  NaN   3  17.0
3  NaN  NaN  15  18.0 



### 批量扫描文件夹并获取文件路径


In [3]:
import re
import os
from typing import List, Generator

def scan_file_path(
    folder: str, 
    extensions: List[str], 
    exclude: str="^$", 
    recursive: bool=False) -> Generator[str, None, None]:
    """生成器函数，用于生成符合指定条件的文件路径。

    Args:
        folder (str): 要扫描的文件夹。
        extensions (List[str]): 要匹配的文件扩展名列表。
        exclude (str, optional): 要排除的通配符模式。默认为 "^$"(完全不排除)。
        recursive (bool, optional): 是否递归到子目录。默认为 False。

    Yields:
        str: 文件路径。
    """
    # 验证输入文件夹
    if not os.path.isdir(folder):
        raise ValueError("'{}' 不是存在的文件夹".format(folder))
    # 使用 os.scandir 遍历文件夹中的条目
    with os.scandir(folder) as it:
        for entry in it:
            # 检查条目是否为文件，并且其名称是否与扩展名和排除模式匹配
            if (entry.is_file() 
                and entry.name.endswith(tuple(extensions)) 
                and not re.search(exclude, entry.path)):
                # 输出文件路径
                yield entry.path
            # 检查条目是否为目录，并且是否启用递归扫描
            elif entry.is_dir() and recursive:
                # 递归到子目录
                yield from scan_file_path(entry.path, extensions, exclude, recursive)



# 示例代码
if __name__ == "__main__":
  extensions = ['.csv','.dta']
  folder = "." 
  exclude = r".csv$"
  paths1 = scan_file_path(folder,extensions,recursive=True)
  paths2 = scan_file_path(folder,extensions,recursive=True,exclude=exclude)

  for path in paths1:
    print(f"未排除csv文件的路径如下: \n {path} \n")

  for path in paths2:
    print(f"排除csv文件的路径如下: \n {path} \n")

未排除csv文件的路径如下: 
 ./assets/auto.dta 

未排除csv文件的路径如下: 
 ./assets/auto.csv 

排除csv文件的路径如下: 
 ./assets/auto.dta 



In [4]:
from typing import Dict, Any
def dict_to_df(_dict: Dict[Any, Any], key_name: str, value_name: str) -> pd.DataFrame:
    """
    字典转换为一个dataframe, 字典键对应第一列，字典值第二列。

    参数：
    _dict (Dict[Any, Any]): 字典。
    key_name (str): 字典的keys对应的列名。
    value_name (str): 字典的values对应的列名。
    
    返回值：
    df(pd.DataFrame): 一个两列dataframe，第一列对应字典的keys，第二列对应字典的values。
    """
    df = (pd.DataFrame.from_dict(_dict, orient='index', columns=[value_name])
        .rename_axis(key_name))
  
    return df

if __name__ == "__main__":
    # 定义测试字典
    test_dict = {'a': 1, 'b': 2, 'c': 3}

    # 调用 dict_to_df 函数
    df = dict_to_df(test_dict, 'key', 'value')

    # 打印输出结果，查看是否符合预期
    print(f"{test_dict} \n")
    print(f"{df} \n")


{'a': 1, 'b': 2, 'c': 3} 

     value
key       
a        1
b        2
c        3 



In [5]:
import pandas as pd
from typing import Generator, List
import numpy as np

def read_stata(
    file_path: str, 
    chunksize: int=None, 
    keep_data: str = 'all', 
    convert_categoricals:bool=False,
    preserve_dtypes:bool=False,
    convert_missing:bool=True,
    usecols:List[str]|None=None,
    dtype: List[str] | str | None=None) -> Generator:
    """
    读取 Stata 文件并返回数据和标签。
    
    参数:
    file_path (str): Stata 文件的路径。
    chunksize (int): 读取stata文件的数据块的大小。
    keep_data (str): 保留数据的类型, 'all'为数据和标签，'only_label'仅标签，
                    'only_data'仅数据，默认为'all'。
    convert_categoricals (bool): 是否转换原始值为值标签对应值，默认值为False。
                    注意，有些文件转换会报错。
    convert_missing (bool): 是否以stata缺失值类型存储，默认值为True。
    usecols (List[str]|None): 保留的列，默认值None保留所有列。
    dtype (List[str]||None): 制定列的数据类型，默认为None。

    
    返回值:
    Generator: 返回一个(DataFrame)生成器。
    """
    
    # 创建 StataReader 并设置参数。
    reader = pd.read_stata(
        file_path, 
        chunksize=chunksize, 
        convert_categoricals=convert_categoricals,
        preserve_dtypes=preserve_dtypes,
        convert_missing=convert_missing,
        columns=usecols)
    # 如果保留标签信息
    if keep_data in ['all','only_label']:
        # 获取Stata文件的变量标签dataframe
        variable_labels = dict_to_df(
            reader.variable_labels(),
            key_name='_column_name',
            value_name='_column_label').reset_index()
        # 获取Stata文件的值标签dataframe
        value_labels = dict_to_df(
            reader.value_labels(),
            key_name='_value_label_name',
            value_name='_value_label').reset_index()
        # Outer横向合并生成标签dataframe
        label = pd.merge(
            variable_labels,
            value_labels,
            left_on='_column_name',
            right_on='_value_label_name',
            how='outer',
            copy=False).astype(str)

        if keep_data == "only_label":
            # 仅返回label信息
            return label
            
        else: 
            # 返回包含标签和数据的信息
            for df in reader:
                labels = pd.concat([label,df],axis=1,join='outer')
                if dtype:
                    yield labels.astype(dtype)
                else:
                    yield labels
    elif keep_data == "only_data":
        # 仅返回数据数据dataframe块
        for df in reader:
            if dtype:
                yield df.astype(dtype)
            else:
                yield df
    
    else:
        # 返回错误
        raise ValueError(f"paramter 'keep_data' got an unexpected value '{keep_data}'")


# 函数调用示例：
if __name__ == "__main__":
    # 生成两个示例的df chunks生成器
    data1 = read_stata("./assets/auto.dta",
            chunksize=50,dtype='str')
    data2 = read_stata("./assets/auto.dta",
            chunksize=50,keep_data="only_label")
    data3 = read_stata("./assets/auto.dta",chunksize=50,
            keep_data="only_data")
   
    # # 遍历生成器
    for index, df in enumerate(data1):
        print(f"生成器data1中的df块{index+1}:\n {df.head(5)}")
        print(df.dtypes, "\n")
    for index, df in enumerate(data2):
        print(f"生成器data2中的df块{index+1}:\n {df.head(5)}")
    for index, df in enumerate(data3):
        print(f"生成器data3中的df块{index+1}:\n {df.head(5)}")


生成器data1中的df块1:
   _column_name       _column_label _value_label_name _value_label  \
0         make      Make and model               nan          nan   
1        price               Price               nan          nan   
2          mpg       Mileage (mpg)               nan          nan   
3        rep78  Repair record 1978               nan          nan   
4     headroom      Headroom (in.)               nan          nan   

            make price mpg rep78 headroom trunk weight length turn  \
0    AMC Concord  4099  22     3      2.5    11   2930    186   40   
1      AMC Pacer  4749  17     3      3.0    11   3350    173   40   
2     AMC Spirit  3799  22     .      3.0    12   2640    168   35   
3  Buick Century  4816  20     3      4.5    16   3250    196   40   
4  Buick Electra  7827  15     4      4.0    20   4080    222   43   

  displacement          gear_ratio foreign  
0          121  3.5799999237060547       0  
1          258  2.5299999713897705       0  
2          1

In [6]:
import sqlite3

def get_table_names(db_file: str) -> List[str]:
    """获取给定数据库中的表名列表。

    Args:
        db_file: 数据库文件的路径。

    Returns:
        数据库中的表名列表。
    """
    # 获取数据库中的所有表名
    with sqlite3.connect(db_file) as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        table_names = [table[0] for table in cursor]

    # 如果给定的表名在列表中，则返回 True，否则返回 False
    return table_names

# 示例调用
print(get_table_names('my_database.db'))  # 输出：True 或 False



['table1_1', 'table2_1', 'table3_1', 'table2', 'table3']


In [7]:
import sqlite3

def get_unique_name(name: str, name_list: List[str]) -> str:
    """获取唯一的名字。

    如果给定的名字已经在列表中，则提示用户输入新的名字，直到输入的名字在列表中不存在为止。返回不存在于列表中的名字。

    参数:
    name: str 需要检查的名字。
    name_list: List[str] 名字列表。

    返回:
    str: 不存在于列表中的名字。
    """
    new_name = name
    while new_name in name_list:
        new_name = input("请输入一个新的名字: ")
    return new_name

def delete_table(table_name: str, db_file: str):
    """
    在指定数据库文件中删除指定的表。

    参数:
    - table_name: 需要删除的表的名称。
    - db_file: 数据库文件的路径。
    """
    with sqlite3.connect(db_file) as conn:
        c = conn.cursor()
        c.execute(f"DROP TABLE {table_name}")

def name_table(name: str, db_file: str, delete_existing_table: bool = False) -> str | None:
    """
    为 SQLite 数据库表命名。如与原表名冲突，则选择要么重命名要么删除原表。

    参数：
    - name (str): 用户定义的名称。
    - db_file (str): SQLite 数据库的文件路径。
    - delete_existing_table (bool): 当表存在时，False（默认）为用户输入新名，True 为删除原表。

    返回值：
    - str | None: 表名或 None。
    """
    # 获取 SQLite 数据库中的全部表名
    table_names = get_table_names(db_file)
    
    # 当命名与已有表名冲突时
    if name in table_names:
        # 删除原表或输入新名字
        return delete_table(name, db_file) if delete_existing_table else get_unique_name(name, table_names)
    else:
        # 返回原名
        return name


# 函数调用示例：
if __name__ == "__main__":
    table_name = name_table('table1',"my_database.db",delete_existing_table=True)
    print(table_name,'\n')



table1 



In [1]:
import functools
import os
import pandas as pd
from typing import Tuple, List, Generator
def read_data(
    filepath: str, 
    chunksize: int= None, 
    usecols: List[str]= None,
    dtype:List[str] | str= 'str')->Generator[Tuple[str, pd.DataFrame],None,None]:
    """
    读取文件数据
    
    参数:
    filepath: str 文件路径。
    chunksize: int, (optional) 块大小，用于指定读取数据文件时分块的大小。如果未指定，则不会将数据文件分块。
    usecols: List[str], (optional) 列名列表，用于指定导入的列。如果未指定，则为全部列。
    dtype: List[str] | str, (optional) 列数据类型或其列表，用于制定列的数据类型，默认全部为'str'。
        
    Yields:
    Generator[Tuple[str, pd.DataFrame],None,None] 生成器，包含文件名和数据块的元组。
    """
    # 获取文件名和文件扩展名
    filename, file_extension = os.path.splitext(filepath)

    # 定义将文件扩展名映射到读取函数的字典
    extensions_to_readers = {
        '.csv': pd.read_csv,
        '.dta': read_stata,
        '.xlsx': pd.read_excel
    }

    # 使用 lru_cache 装饰器缓存每种文件扩展名的读取函数的结果
    @functools.lru_cache()
    def reader(ext):
        return extensions_to_readers.get(ext)

    # 从缓存中查找读取函数
    reader_fn = reader(file_extension)

    # 检查文件扩展名是否受支持
    if reader_fn is None:
        raise ValueError(f"Unsupported file extension: {file_extension}")

    # 使用指定的块大小以块的方式读取数据文件
    for chunk in reader_fn(filepath, chunksize=chunksize, iterator=True):
        # 将文件名和 DataFrame 块作为生成器的一部分生成
        yield filename, chunk



In [9]:
from typing import Union, Iterator

def write_df_to_sqlite(
    df: Union[pd.DataFrame, Iterator[pd.DataFrame]],
    db_file: str, 
    table_name: str, 
    chunk_size: Union[int, None]=None) -> None:
  # 连接数据库
  with sqlite3.connect(db_file) as conn:
    # 将数据帧按块写入数据库表中
    chunk_iter = df if isinstance(df, Iterator) else df.groupby(np.arange(len(df)) // chunk_size) \
        if chunk_size else [df]
    if_exists = 'replace' if isinstance(df, pd.DataFrame) else 'append'
    for chunk in chunk_iter:
      # 获取数据帧块
      chunk_df = chunk if isinstance(df, Iterator) else chunk[1]
      # 将数据帧块写入数据库表中
      chunk_df.to_sql(table_name, conn, if_exists=if_exists)
      # 提交事务
      conn.commit()



In [1]:
from Data import DataCleaner
import pandas as pd

folder = '/mnt/Data/data/CFPS/CFPS_tidy/csv' 
dc= DataCleaner()

paths = dc.scan_file_path(folder)

dc.import_csv_to_sqlite(paths,'test1.db')
# datas =  dc.read_data(paths)

# dc.write_to_sqlite(datas,'test.db')



# for path in data_paths:
#     print(path)
# datas = dc.read_data(paths)

# for name, df in datas:
#     print(name)
#     display(df)
# dc.write_to_db(datas,'test.db')

# for path in paths:
#     print(path)


# for chunk in dc._read_stata('/mnt/Data/data/CFPS/CFPS_tidy/child2012.dta'):
#     print(chunk)

In [1]:
from Data import DataCleaner


folder = '/mnt/Data/data/CFPS/CFPS_tidy/csv' 
ldc= LightDataCleaner()

paths = ldc.scan_csv_path(folder)

ldc.import_csv_to_sqlite(paths,'cfps.db')

In [1]:
from Data import DataCleaner

dc = DataCleaner()
data = dc._read_stata('/mnt/Data/data/CFPS/CFPS_tidy/child2010_v201906.dta',keep_data='only_label')
