In [2]:
import math
import asyncio
import numpy as np
import pandas as pd
from index import Index
from logger import logger
from sqlalchemy import text
from datetime import datetime
from calculator import Calculator
from pandas.core.frame import DataFrame
from typing import Any, List, Union, Optional
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

pd.options.mode.chained_assignment = None

IndexCalculator 是计算器末端类，其继承了父类Index类中的SQL语句和组装方法，以及爷类Battery类中的数据库交互方法。
db_columns_comments 是计算结果存储表中各字段的描述性评论，
通过更改字典key对应的值，表中的描述将在更改后的更新中改变。

In [3]:
from calculators import uscalculator

In [None]:
class IndexCalculator(Index, Calculator):
    """This is a calculator built for calculating indexes of amazon global markets,
    use 'IndexCalculator._methods()' to see methods it contains
    """

    db_columns_comments = {
        "category_id": "亚马逊品类ID",
        "maincategory": "亚马逊品类名称",
        "site": "亚马逊站点",
        "amv": "品类市场容量(当地货币)",
        "cii": "竞争激烈指数：该指数越高则说明当前品类竞争越为激烈",
        "cci": "行业集中指数：该指数越高则说明当前品类寡头垄断程度越严重",
        "pqi": "商品品质指数：该指数越高则说明当前品类下的商品平均品质越高",
        "nvpi": "新品友好指数：该指数越高则说明当前品类下新品市场份额占比越高",
        "cvr": "中国卖家销售额占比: 该比例计算当前品类下中国卖家的销售额占比",
        "avr": "亚马逊自营销售额占比: 该比例结算当前品类下亚马逊自营的销售额占比",
        "asr": "亚马逊自营比例：该比例越高则说明当前品类下卖家为亚马逊自营比列越高",
        "csr": "中国卖家比列：该比例越高则说明当前品类下商品为中国卖家所售比例越高",
        "remark_used": "计算使用的数据的标识日期，取最近一次有效标识",
        "created_at": "计算完成并存入数据库的日期",
    }

In [12]:
uscalculator.db_columns_comments['site']

'亚马逊站点'

### init方法
init方法是Indexcalculator的构造方法，其接受site参数和db参数，
site是必须参数，代表着不同的亚马逊站点，通过改变site参数我们能控制indexcalculator
计算的亚马逊站点。   
db参数的默认值为local，改变db参数会改变链接的数据库,local代表本地数据库，cloud代表华为云数据库

### getitem方法
getitem方法使Indexcalculator的实例支持切片操作，返回最近可用的remarks

### repr方法
repr方法使Indexcalculator的实例支持打印操作，返回当前实例的站点信息及可用方法

In [11]:
    def __init__(self, site: str, db: str="local"):
        super(IndexCalculator, self).__init__(db=db)
        self.site = site.lower()
        self._nowtime = datetime.today().strftime("%Y-%m-%d")

    def __getitem__(self, position: int) -> Union[List[str], str]:
        return self._remarks[position]

    def __repr__(self) -> str:
        return f""" An index calculator of the {self.site} site
                methods: {[', '.join(i for i in IndexCalculator._methods())]} \n
               """
print(uscalculator[2:3])
print('----')
print(uscalculator)

                         remark
2  all_categories_us_2021-04-15
----
 An index calculator of the us site
                methods: ['_get_data_checked, _getcci, _getcii, _getnvpi, _getpqi, _methods, calculate, db_assemble, get_data_by_id, get_data_cleaned, get_data_fixed, get_data_length, get_indexes, get_nrows_data, post_result_db, set_column_types, set_data_sql, set_id_sql, set_last_sql, set_length_sql, set_ms_sql, set_remark_sql, set_sr_sql'] 

                


### remarks和remark方法
remarks方法会连接数据库并查询当前站点最近的5个remarks  
remark方法会基于remarks返回的5个remarks进行判断，  
如果最新的remark已经出现超过两天，则判断其数据已经爬取完毕，可以进行使用  
并将其设置为默认remark
否则，使用最近的第二个remark，并将其设置为默认remark.  
 remarks 和 remark方法都是异步方法，也是property装饰的attribute。

In [None]:
    @property
    async def remarks(self) -> DataFrame:
        ''' return most recent five remarks ordered by date
        '''
        self.remark_sql = self.set_remark_sql(self.site)
        async with self.async_db_engine.connect() as conn:
            result = await conn.execute(text(self.remark_sql))
            self._remarks = pd.DataFrame(result.fetchall(), columns=result.keys()).iloc[:5,:]
        return self._remarks

    @property
    async def remark(self) -> str:
        """ return an remark that is available to use, which means \n
        the remark has been existing in the db for more than two days
        """
        if (
            pd.to_datetime(self._nowtime)
            -
            pd.to_datetime(
                (await self.remarks).iloc[0][0][-10:]
            )
        ).days > 2:
            self._remark = (await self.remarks).iloc[0][0]
        else:
            self._remark = (await self.remarks).iloc[1][0]
        return self._remark

In [15]:
await uscalculator.remark

'all_categories_us_2021-04-18'

### last_update_date方法和is_data_ready方法
last_update会查询计算结果表中的remark_used字段  
is_data_ready会将last_update返回的结果与最新的remark比较  
判断数据是否重复，以此避免重复计算。  

In [None]:
    @property
    async def last_update_date(self) -> datetime:
        ''' return a pandas Datetime object indicates the remark
            being used in the last update, then compare it to the newest remark
            to avoid repeated calculation
        '''

        self.last_sql = self.set_last_sql(self.site)
        async with self.async_db_engine.connect() as conn:
            result = await conn.execute(text(self.last_sql))
            self._last_update_date = pd.to_datetime(pd.DataFrame(result.fetchall(), columns=result.keys()).iloc[0][0])
        return self._last_update_date

    @property
    async def is_data_ready(self) -> bool:
        ''' _pre_update_date shows the date of newest remark available
            and compare it to the last_update_date, and return a boolean
            value indicates wether the remark is never used in the previous updates.
        '''

        self._pre_update_date = (await self.remark)[-10:]
        self._pre_update_date = pd.to_datetime(self._pre_update_date)
        self._last_update_date = await self.last_update_date
        self._is_data_ready =  self._last_update_date < self._pre_update_date
        return self._is_data_ready

In [16]:
await uscalculator.is_data_ready

True

### get_data_length方法
get_data_length方法使用当前remark对数据库中数据的行数进行查询，   
将返回一个整数
### get_data_by_id方法
get_data_by_id方法接受category_id，remark和getindexes作为参数。  
以string的格式输入单个remark或者以元祖的形式输入多个category_id  
此方法将会根据这些category_id和remark查询计算所需的原始数据。
如果remark没有被输入，则会使用默认remark
getindexes方法是一个boolean数值，如果为True，则会基于给定Category_id的数据  
进行指数计算，并返回计算结果。

In [None]:

    async def get_data_length(self, remark: str) -> int:
        ''' search of the length of the data that would be 
            retrived in the upcoming queries, so that the time 
            of executing those queries would be reduced as much as possible.
        '''

        async with self.async_db_engine.connect() as conn:
            self.data_len_sql = self.set_data_sql(remark=remark)
            result = await conn.execute(text(self.data_len_sql))
            data_len = int(result.fetchall()[0][0]) or 0
        return data_len
        
    async def get_data_by_id(
        self,
        *,
        category_id: Union[tuple, str],
        remark: Optional[str] = None,
        getindexes: bool = False,
    ) -> DataFrame:
        """accept one single category_id in str or couples of categories_ids in tuple, \n
        would return either the original data stored inside the database or \n
        the indexes calculated based on the original data if the parameter 'getindexes' is            set to be True
        """

        if remark:
            self._remark = remark

        if isinstance(category_id, tuple) and len(category_id) > 1:
            self.id_sql = self.set_id_sql(category_tuple=category_id, remark=self._remark)
        elif isinstance(category_id, str):
            category_id = f"""('{category_id}')"""
            self.id_sql = self.set_id_sql(category_tuple=category_id, remark=self._remark)
        else:
            raise TypeError("Input type has to be either tuple or str")

        async with self.async_db_engine.connect() as conn:
            result = await conn.execute(text(self.id_sql))
            self.data = pd.DataFrame(result.fetchall(), columns=result.keys())

        if getindexes:
            try:
                self.data = await self.calculate(df=self.get_data_cleaned(df=self.data), date_check=False, remark=self._remark)
                print('passing parameter remark', self._remark)
            except:
                logger.info('an error occured when calculating indexes')
            finally:
                return self.data
        return self.data

In [25]:
await uscalculator.get_data_by_id(remark='all_categories_us_2021-04-14',category_id='10049077011')

Unnamed: 0,maincategory,category_id,amv,asin,ranking,rating,mrevenue,release_date
0,Cupcake Carriers,10049077011,1318122,B07D8FNKHM,1,4.8,124917,05/31/2018
1,Cupcake Carriers,10049077011,1318122,B082VMS17F,2,4.7,77503,12/17/2019
2,Cupcake Carriers,10049077011,1318122,B07S646QPH,3,4.7,52481,05/23/2019
3,Cupcake Carriers,10049077011,1318122,B073F8WP69,4,3.8,15373,06/28/2017
4,Cupcake Carriers,10049077011,1318122,B00HXCI6PO,5,4.6,46022,01/14/2014
...,...,...,...,...,...,...,...,...
96,Cupcake Carriers,10049077011,1318122,B07FMZDVVN,97,3.7,3804,12/06/2018
97,Cupcake Carriers,10049077011,1318122,B08BX7ZMSB,98,4.7,1610,11/03/2016
98,Cupcake Carriers,10049077011,1318122,B0892NH4T2,99,4.4,3358,05/22/2020
99,Cupcake Carriers,10049077011,1318122,,,,,


### get_nrows_data方法
get_nrows_data方法会接受remark, rows 和clean参数。  
remark参数将确定查询的依据.  默认为最新可用remark  
rows参数将确定查询的行数大小.   默认为remark全部数据  
clean参数将确定是否将数据进行清洗, 即是否调用get_data_cleaned方法。  
get_nrows_data是程序取值的默认方法， 也可以被外部直接调用.

In [None]:
    async def get_nrows_data(
        self,
        *,
        remark: Optional[str] = None,
        rows: Optional[int] = None,
        clean: bool = True,
    ) -> DataFrame:
        """the newest available remark will be used without specific remark assgiend to the method,
        the number of rows the newest remark will be used without specific rows assigned to the method.
        """
        if remark or rows:
            if remark:
                self._remark = remark
                self.rows = await self.get_data_length(remark=self._remark)
                if rows:
                    self.rows = rows
            if rows:
                self.rows = rows
                self._remark = await self.remark
        else:
            self._remark = await self.remark
            self.rows = await self.get_data_length(remark=self._remark)

        df = pd.DataFrame()

        if self.rows <= 2000000:
            logger.info(f"正在查询{self.rows}行{(self._remark)[-5:]}日{self.site.upper()}站数据")
            pages = [0, self.rows]
            interval = self.rows
            self.data_sql = self.set_data_sql(
                remark=self._remark, interval=interval, pages=pages, index=1
            )

            async with self.async_db_engine.connect() as conn:
                result = await conn.execute(text(self.data_sql))
                _partial_df = pd.DataFrame(result.fetchall(), columns=result.keys())
                df = df.append(_partial_df)

        else:
            interval = int(self.rows / 2)
            pages = list()
            for page in range(2):
                pages.append(page * interval)
            pages.append(self.rows)
            logger.info(
                f"正在查询{self.rows}行{(self._remark)[-5:]}日{self.site.upper()}站数据，数据量太大, 将数据分为{pages}进行分页查询"
            )
            initial = 1
            while initial < len(pages):
                self.data_sql = self.set_data_sql(
                    remark = self._remark,
                    interval = self.interval,
                    pages = pages,
                    index = self.initial,
                )
                async with self.async_db_engine.connect() as conn:
                    result = await conn.execute(text(self.data_sql))
                    _partial_df = pd.DataFrame(result.fetchall(), columns=result.keys())
                    df = df.append(_partial_df)
                self.initial += 1

        logger.info(f"已完成{self.rows}行查询")

        if clean:
            self.data = self.get_data_cleaned(df=df)
        else:
            self.data = df
        return self.data

In [6]:
await uscalculator.get_nrows_data(remark='all_categories_us_2021-04-19',rows=2000,clean=False)

Unnamed: 0,maincategory,category_id,amv,asin,ranking,rating,mrevenue,release_date
0,Bathroom Vanities,6810202011,3680245,B07GZNK5ZX,1,4.4,50826,2018-08-31
1,Rotary Lasers,553276,2744237,B017LTFSSG,1,4.7,559005,11/05/2015
2,Power Jointers,552824,1112195,B004Q0I8YU,1,4.4,482855,02/28/2011
3,Two-Piece Toilets,542639011,2386861,B07BSXNDD7,1,4.7,45762,2013-05-02
4,One-Piece Toilets,542638011,2987130,B0744CTXW8,1,4.4,221170,2017-07-21
...,...,...,...,...,...,...,...,...
1995,Bathroom Vanities,6810202011,3680245,B08116R2X4,58,0.0,65999,2019-11-04
1996,Rotary Lasers,553276,2744237,B00AFIMFZA,60,4.6,0,11/29/2012
1997,Power Jointers,552824,1112195,B08JHRYMLC,58,4.0,0,09/19/2020
1998,Two-Piece Toilets,542639011,2386861,B08ZJYCF2W,58,0.0,2999,2021-03-19


### _get_data_checked方法
_get_data_checked是一个受保护方法，不应该被外部调用，其检测数据的质量并记录在log文件中。
其中  
valid_dates_rate 表示有效日期占比，计算所有为空或为'0000-00-00'的日期数量，并除以总数量，对1取余。  
valid_asin_ratio 表示有效营业额占比， 计算所有营业额在1以上的数量的asin并除以总ASIN数，对1取余。valid_cii 表示有效CII比例，计算每个品类排名前八的asin中有ranking的asin占比和该品类中有ranking的asin占最大ranking的比例，取最小值。

In [None]:
    def _get_data_checked(self, df: pd.DataFrame) -> DataFrame:
        """this method should not be called directly from ousdie"""

        self.data = df
        categories = self.data["category_id"].nunique()
        valid_dates_rate = round((1-(self.data["release_date"].isnull().sum()
                    + (self.data["release_date"] == "0000-00-00").sum())
                    / (self.data.shape[0]))*100,3)
        
        valid_rows = (self.data["mrevenue"].fillna(0).astype(
                        "int", errors="ignore")>=1).sum()
        self.data["ranking"] = self.data["ranking"].fillna(0).astype("int")
        valid_cii = (
            min(
                round((
                        self.data[self.data["ranking"].fillna(0).astype("int") <= 8]
                        .groupby("category_id")
                        .size()/8).mean(),3,
                ),
                round((
                        self.data[(self.data["ranking"] >= 1)]
                        .groupby("category_id")
                        .size()
                        /self.data.groupby("category_id")["ranking"].max()).mean(),3,
                ),
            )
        ) * 100

        valid_asin_ratio = round((1 - (valid_rows/self.data.shape[0]))*100, 3)

        if self.data.shape[0] >= 200000 and valid_cii <= 87.5:
            self.data = self.get_data_fixed(df=self.data)

        logger.info(
            f"""已完成{self.data.shape[0]}行{self.site.upper()}站数据查询,\n
                共{categories}个品类\n
                其中有{valid_rows}行有效,\n
                有效比例为: {valid_asin_ratio}%,\n
                有效cii占比：{valid_cii}%,\n
                有效日期占比：{valid_dates_rate}%,\n
             """
        )
        return df

### get_data_cleaned方法
get_data_cleaned方法在对数据进行清洗与整理之前自动执行_get_data_checked以记录数据质量  
之后，根据父类index cols_dtypes中对数据类型的定义，get_data_cleaned将针对各个类型进行  
缺失值补充，类型填补，和数据整理。
对于除ranking以外的数值类型的column，将使用平均值对齐进行填补。  
此后，get_data_cleaned会针对每个品类进行检测，如果该品类营业额为0或为空的asin占所有asin的比例达到了一半以上，则不对该品类进行计算。  
get_data_cleaned 会将离散的日期类型转化为连续的数值类型，计算方法为将该日期减去当前日期并取天数差。

In [None]:
  def get_data_cleaned(self, df: pd.DataFrame) -> Any:
        """as the name suggests, this method is for cleanning up the data
        and get it prepared before the data getting calculated.
        for each column of the data, if it is supposed of the type int we convert it to float first
        and then fill nan values with the mean of the column and finally conver it to int
        for columns of the type float, we do the similar thing except round the number to one digit.
        """
        df = self._get_data_checked(df=df)

        for column in df.columns:
            if column == "ranking":
                df[f"{column}"].fillna(0, inplace=True)

            elif self.cols_dtypes[f"{column}"] == "int":
                df[f"{column}"] = (
                    df[f"{column}"]
                    .fillna(pd.to_numeric(df[f"{column}"], errors="coerce").mean())
                    .astype("float")
                    .apply(lambda x: int(x))
                )
            elif self.cols_dtypes[f"{column}"] == "float":
                df[f"{column}"] = (
                    df[f"{column}"]
                    .fillna(pd.to_numeric(df[f"{column}"], errors="coerce").mean())
                    .astype("float")
                    .apply(lambda x: round(x, 1))
                )
            df[f"{column}"] = df[f"{column}"].astype(
                self.cols_dtypes[f"{column}"], errors="ignore")

        # we don't calculate categories which more than half of its all asins have no mrevenue
        valid_cat_num = df[df["mrevenue"] > 1].groupby("category_id").size()
        total_cat_num = df.groupby("category_id").size()
        for category in (set(df['category_id'])-set(valid_cat_num.index)):
            valid_cat_num[f'{category}'] = 0
        _ic = (valid_cat_num / total_cat_num).apply(lambda x: x <= 0.5)
        self.invalid_categories = _ic[_ic].index
        df = df[~df["category_id"].isin(self.invalid_categories)]
        dropped_size = len(_ic[_ic].index)
        try:
            logger.info(f"dropped :{dropped_size} categories")
        except:
            print(f"dropped :{dropped_size} categories")

        df.loc[
            (df["release_date"].isna()) | (df["release_date"] == "0000-00-00"),
            "release_date",] = self._nowtime

        df.loc[
            df["release_date"].str.split("/").apply(lambda x: len(x) == 3),
            "release_date",] = pd.to_datetime(
            df[df["release_date"].str.split("/").apply(lambda x: len(x) == 3)]["release_date"],
            format="%m/%d/%Y",).dt.date

        df["release_date"] = pd.to_datetime(df["release_date"], errors="coerce")

        df["days_since_released"] = (
            df["release_date"]
            .apply(lambda x: pd.to_datetime(self._nowtime) - x)
            .dt.days.fillna(0)
            .astype(int)
        )
        df.drop(columns="release_date", inplace=True)
        self.data = df
        return self.data

In [7]:
uscalculator.get_data_cleaned(df=uscalculator.data)

  return lambda x: Timestamp(x, freq=self.freq, tz=self.tz)
  return lambda x: Timestamp(x, freq=self.freq, tz=self.tz)
  return lambda x: Timestamp(x, freq=self.freq, tz=self.tz)
  return lambda x: Timestamp(x, freq=self.freq, tz=self.tz)


Unnamed: 0,maincategory,category_id,amv,asin,ranking,rating,mrevenue,days_since_released
0,Bathroom Vanities,6810202011,3680245,B07GZNK5ZX,1,4.4,50826,964
1,Rotary Lasers,553276,2744237,B017LTFSSG,1,4.7,559005,1994
2,Power Jointers,552824,1112195,B004Q0I8YU,1,4.4,482855,3705
3,Two-Piece Toilets,542639011,2386861,B07BSXNDD7,1,4.7,45762,2911
4,One-Piece Toilets,542638011,2987130,B0744CTXW8,1,4.4,221170,1370
...,...,...,...,...,...,...,...,...
1995,Bathroom Vanities,6810202011,3680245,B08116R2X4,58,0.0,65999,534
1996,Rotary Lasers,553276,2744237,B00AFIMFZA,60,4.6,0,3065
1997,Power Jointers,552824,1112195,B08JHRYMLC,58,4.0,0,214
1998,Two-Piece Toilets,542639011,2386861,B08ZJYCF2W,58,0.0,2999,33


### get_data_fixed方法
get_data_fixed方法通过机器学习的方法对数据进行填补  
为了降低计算复杂度和耗时，get_data_fixed当前仅使用LR单模型  
对每个品类中排名前八的ASIN销售额进行填补。  
通过ranknum可以填补的ASIN数量。  
以get_data_fixed方法填补的row，其ASIN名称会被标注为'Fixed'

In [None]:
    def get_data_fixed(self, df: pd.DataFrame, ranknum: int = 9) -> DataFrame:
        """fixing the rows missing revenue and fulfilled with Linear Regression
        adjusting the range of fixing with parameter ranknum, notice that
        ranknum = 9 means fixing the asins with rank from 1 to 8.
        """

        originalrows = df.shape[0]
        fullrank = list(range(1, ranknum))

        for cat_id in iter(df["category_id"].unique()):
            X = df.loc[df["category_id"] == cat_id, ["ranking"]].values
            y = df.loc[df["category_id"] == cat_id, "mrevenue"].values

            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.3, random_state=0
            )
            regressor = LinearRegression()
            regressor.fit(X_train, y_train)

            currentrank = list(
                df[(df["category_id"] == cat_id) & (df["ranking"] <= 8)][
                    "ranking"
                ].values
            )

            MissedRank = list(set(fullrank) - set(currentrank))

            res, ans = [], []
            for rank in MissedRank:
                res.append(abs(int(regressor.predict([[rank]])[0])))

            for rank, predictedvalue in zip(MissedRank, res):
                ans.append((rank, predictedvalue))

            for pairs in range(len(ans)):
                self.fixedrow = {}
                for i, j in zip(
                    df.columns, df[df["category_id"] == cat_id].iloc[0, :].values
                ):
                    self.fixedrow[f"{i}"] = j
                self.fixedrow["ranking"] = ans[pairs][0]
                self.fixedrow["mrevenue"] = ans[pairs][1]
                self.fixedrow["asin"] = "Fixed"
                df = df.append(self.fixedrow, ignore_index=True)
        try:
            logger.info(f"已完成数据填补工作，通过线性回归修补了{len(df) - originalrows}行数据,现在开始指数计算")
        except:
            print(f"已完成数据填补工作，通过线性回归修补了{len(df) - originalrows}行数据,现在开始指数计算")
        self.data = df

        return self.data


In [8]:
uscalculator.get_data_fixed(df=uscalculator.data)

Unnamed: 0,maincategory,category_id,amv,asin,ranking,rating,mrevenue,days_since_released
0,Bathroom Vanities,6810202011,3680245,B07GZNK5ZX,1,4.4,50826,964
1,Rotary Lasers,553276,2744237,B017LTFSSG,1,4.7,559005,1994
2,Power Jointers,552824,1112195,B004Q0I8YU,1,4.4,482855,3705
3,Two-Piece Toilets,542639011,2386861,B07BSXNDD7,1,4.7,45762,2911
4,One-Piece Toilets,542638011,2987130,B0744CTXW8,1,4.4,221170,1370
...,...,...,...,...,...,...,...,...
1769,Power Jointers,552824,1112195,B08JHRYMLC,58,4.0,0,214
1770,Two-Piece Toilets,542639011,2386861,B08ZJYCF2W,58,0.0,2999,33
1771,One-Piece Toilets,542638011,2987130,B076HCR3QD,58,4.4,34179,1283
1772,Defibrillators,3762871,1466636,Fixed,6,4.7,100557,1224


In [None]:
def _getcii(self, df: pd.DataFrame) -> DataFrame:
    """Competitive Intensity Index"""
    self.cii = (
        (df.groupby("category_id").mrevenue.apply(
            lambda x: int(
                (1-((((((((x / x.sum()) * 100)
                    -(((x / x.sum()) * 100).sum()
                        / x.size))** 2).sum())
                        /(x.size - 1))** 0.5)
                    /((((x.size * (100 ** 2)) - (100 ** 2))
                /(x.size ** 2 - x.size))** 0.5)))* 100)))
        .to_frame()
        .reset_index()
        .rename({"mrevenue": "cii"}, axis="columns"))
    return self.cii

def _getcci(self, df: pd.DataFrame) -> DataFrame:
    """Industrial Concentration Index"""
    self.cci = (
        (
            df.sort_values(by=["mrevenue"], ascending=False)
            .groupby("category_id")
            .head(8)
            .loc[:, ["category_id", "mrevenue"]]
            .groupby("category_id")["mrevenue"]
            .sum()
            / df.sort_values(by=["mrevenue"], ascending=False)
            .groupby("category_id")["mrevenue"]
            .sum()
        )
        .apply(lambda x: int(x * 100))
        .to_frame()
        .reset_index()
        .rename({"mrevenue": "cci"}, axis="columns")
    )
    return self.cci

def _getnvpi(self, df: pd.DataFrame) -> DataFrame:
    """Newly validated product index"""
    _unproccessed = (
        df.query("days_since_released <= 90")
        .groupby("category_id")
        .agg({"mrevenue": np.sum})
    )
    _proccessed = (df.groupby("category_id").agg({"mrevenue": np.sum}))[
        ~(df.groupby("category_id").agg({"mrevenue": np.sum})).index.isin(
            (
                df.query("days_since_released <= 90")
                .groupby("category_id")
                .agg({"mrevenue": np.sum})
            ).index
        )
    ]
    _proccessed["mrevenue"] = 0
    self.nvpi = (
        (
        _unproccessed.append(_proccessed)
        / (df.groupby("category_id").agg({"mrevenue": np.sum}))
        )["mrevenue"]
        .apply(lambda x: int(x * 100))
        .to_frame()
        .reset_index()
        .rename({"mrevenue": "nvpi"}, axis="columns")
    )
    self.nvpi["nvpi"] = (
        self.nvpi["nvpi"].apply(lambda x: 1 / (1 + math.exp(-x)) * 100).astype(int)
    )
    return self.nvpi

def _getpqi(self, df: pd.DataFrame) -> DataFrame:
    """Product Quality Index"""
    self.pqi = (
        (
            (
                df.query("rating <= 4.6").groupby("category_id").ranking.count()
                / df.groupby("category_id").ranking.count()
            )
            * 100
        )
        .fillna(0)
        .astype("int")
        .to_frame()
        .reset_index()
        .rename({"ranking": "pqi"}, axis="columns")
    )
    return self.pqi

### _getcii,_getcci,_getnvpi,_getpqi 方法
_getcii:根据每个品类中销售额的方差与asin分布，计算竞争激烈程度  
  
_getcci:根据每个品类中销售额排名前八的asin销售额占品类总销售额占比，计算行业集中指数  
  
_getnvpi:根据每个品类中上架日期在90天内的asin销售额占品类总销售额占比，计算新品友好指数  
  
_getpqi:根据每个品类中评分低于4.6分的ASIN数量占ASIN总数的比列，计算商品品质指数


### CII

$$
\sigma_n = \sqrt[2]{\frac{\sum_{i=1}^{N} (a_i-\alpha)^2}{N-1}}\\
\sigma_m = \sqrt[2]{\frac { (N*100^2)-(100)^2 }{N(N-1)}}\\
cii = (1-\frac{\sigma_n }{\sigma_m})*100
    
$$


### CCI

$$
cci = \frac{\sum_{i=1}^{8} \alpha_i}{\sum_{i=1}^{N} \alpha_i}\\
$$


### NVPI

$$
x = \frac{\sum_{i=1}^{n} \alpha_i}{\sum_{i=1}^{N} \alpha_i}\\
nvpi = (\frac{1}{1+e^{-x}})*100
$$


### PQI

$$
pqi = \frac{\sum_{i=1}^{n} \gamma_i}{\sum_{i=1}^{N} \gamma_i}\\
$$


### get_indexes方法
get_indexes方法接受df, postdb, remark, date_check等参数。
df是一个pandas DataFrame， 应该含有index.py中所指定的字段。
如果不传入df参数，则会自动执行get_nrows_data方法获取当前remark的数据。
remark参数接受指定remark，默认为最新可用remark
date_check参数接受boolean值，其将对比当前remark, 并检测数据是否已经经过计算，默认为True。
postdb接受boolean值，默认为False，如果为True，将会执行post_result_db方法，将计算结果保存到数据库。

In [None]:
    async def get_indexes(
        self,
        df: Optional[DataFrame] = None,
        postdb: bool = False,
        remark: str = None,
        date_check: bool = True,
    ) -> DataFrame:
        ''' calculating the four indexes based on given parameters,
            df is the 'raw data' which will be used to calculate indexes,
            it would be a pandas DataFrame with columns written in 'index.py' col_types
            if remark is given it would use the given remark over the default remark
            if date_check is set to be false it would execute the calculation despite it 
            could possibly produce a existing result
        '''

        if remark:
            self._remark = remark
        if date_check:
            assert (await self.is_data_ready), "data is already up-to-date"
        self.data = pd.DataFrame(df)

        if self.data.empty:
            self.data = await self.get_nrows_data()

        if ("days_since_released") in self.data.columns.values:
            assert (
                self.data["days_since_released"].dtype == "int"
            ), """ the column 'days_since_released' should be of type int"""
            for column in self.data.columns.difference(("days_since_released",)):
                assert (
                    self.cols_dtypes[f"{column}"] == self.data[f"{column}"].dtype
                ), f"""{column} is not of type {self.cols_dtypes[f'{column}']}"""
        else:
            self.data = await self.get_data_cleaned(df=self.data)

        logger.info(f"calculating indexes using {self._remark}")

        self.data = (
            self.data.drop_duplicates("category_id")
            .merge(self._getcii(df=self.data), on="category_id")
            .merge(self._getcci(df=self.data), on="category_id")
            .merge(self._getpqi(df=self.data), on="category_id")
            .merge(self._getnvpi(df=self.data), on="category_id")
            .loc[:, ["category_id", "maincategory", "amv", "cii", "cci", "pqi", "nvpi"]]
        )
        self.data["site"] = self.site
        logger.info("计算完成")

        if postdb:
            self.post_result_db(result=await self.calculate(df=self.data))

        return self.data

In [4]:
await uscalculator.get_indexes(df=uscalculator.data)

[04/21/21 17:15:29] INFO     calculating indexes using a indexcalculator.py:510
                             ll_categories_us_2021-04-18                       
                    INFO     计算完成                    indexcalculator.py:521


Unnamed: 0,category_id,maincategory,amv,cii,cci,pqi,nvpi,site
0,6810202011,Bathroom Vanities,3680245,88,35,98,99,us
1,553276,Rotary Lasers,2744237,74,66,58,50,us
2,552824,Power Jointers,1112195,54,78,68,50,us
3,542639011,Two-Piece Toilets,2386861,86,45,89,73,us
4,542638011,One-Piece Toilets,2987130,80,50,82,73,us
5,393298011,Lab Gowns,191851,84,46,92,98,us
6,384994011,Golf Iron Sets,2876867,78,63,61,99,us
7,3473431,Ice Fishing Shelters,186884,54,74,68,50,us
8,3403611,BMX Bikes,935518,69,68,87,99,us
9,3398811,Boat Electric Wire,1752011,54,86,24,50,us


### calculate方法
calculate方法是calculator子类所共有的一个方法，也是计算逻辑的接口。
calculate方法接受df, postdb, datecheck, remark等参数
其使用方法基本等同于get_indexes方法，
如果向calculate方法传递未计算数据，其会检测是否包含全部四项指数(cii,cci,nvpi,pqi)，如否，则会先调用get_indexes方法，然后再计算另外四项指数(asr,csr,avr,cvr)。

In [None]:
    async def calculate(
        self, 
        df: Optional[DataFrame] = None,
        postdb: bool = False,
        date_check: bool = True,
        remark: str = None
    ) -> DataFrame:
        ''' the entrance of the program, by default it would 
            check if the four indexes are in the DataFrame, if not
            it would execute the previous get_indexes method and then 
            search for extra indexes such as csr and asr.
        '''
        if date_check:
            assert (await self.is_data_ready), "data already is up-to-date"
        if remark:
            self._remark = remark
        logger.info("开始全品类查询与计算")
        self.data = pd.DataFrame(df)
        if ("cii" and "cci" and "pqi" and "nvpi") not in self.data.columns:
            self.data = await self.get_indexes(df=self.data, remark=self._remark)

        category_tuple = (
            tuple(self.data["category_id"].unique())
            if len(self.data["category_id"].unique()) > 1
            else f"""('{self.data['category_id'].unique()[0]}')"""
        )

        ms_sql = self.set_ms_sql(remark=self._remark, category_tuple=category_tuple)
        sr_sql = self.set_sr_sql(category_tuple=category_tuple)

        async with self.async_db_engine.connect() as conn:
            msresult = await conn.execute(text(ms_sql))
            csrresult = await conn.execute(text(sr_sql))
            ms_data = pd.DataFrame(msresult.fetchall(), columns=msresult.keys())
            csr_data = pd.DataFrame(csrresult.fetchall(), columns=csrresult.keys())

        self.data = pd.merge(self.data, ms_data, how="left", on="category_id").merge(
            csr_data
        )
        if postdb:
            self.post_result_db(result=self.data)
        return self.data

In [6]:
await uscalculator.calculate(df=uscalculator.data)

[04/21/21 17:16:02] INFO     开始全品类查询与计算        indexcalculator.py:544


Unnamed: 0,category_id,maincategory,amv,cii,cci,pqi,nvpi,site,cvr,avr,asr,csr
0,6810202011,Bathroom Vanities,3680245,88,35,98,99,us,0.57,0.13,0.185,0.508
1,553276,Rotary Lasers,2744237,74,66,58,50,us,0.01,0.64,0.5,0.053
2,542639011,Two-Piece Toilets,2386861,86,45,89,73,us,0.06,0.12,0.273,0.015
3,542638011,One-Piece Toilets,2987130,80,50,82,73,us,0.08,0.2,0.296,0.141
4,393298011,Lab Gowns,191851,84,46,92,98,us,0.14,0.06,0.022,0.215
5,384994011,Golf Iron Sets,2876867,78,63,61,99,us,0.01,0.19,0.218,0.038
6,3473431,Ice Fishing Shelters,186884,54,74,68,50,us,0.05,0.18,0.326,0.151
7,3398811,Boat Electric Wire,1752011,54,86,24,50,us,0.0,0.05,0.417,0.042
8,11444032011,Girls' Outdoor Recreation Pants,224757,66,67,66,73,us,,,0.0,0.0
9,13299141,Outdoor Ovens,1826467,81,54,78,99,us,0.03,0.09,0.067,0.2


### post_result_db方法
post_result_db方法 是受保护方法，是计算过程中的最后一步，负责将计算结果保存到数据库。
post_result_db接受result参数作为数据源，并会添加数据源使用的remark为remark_used，及计算时间为created_at
post_result_db接受table name参数作为存入数据库表格的表名，默认为categories_index_{site}， site为实例的属性。
post_result_db接受method作为存入数据库的方式，默认为append，即在原数据表格上增添数据。
如果将method设置为replace，原表格将被替换为新的表格，并将category_id,remark_used设为联合主键。

In [None]:
   def _post_result_db(
        self,
        *,
        result: pd.DataFrame,
        table_name: Optional[str] = None,
        method: str = "append",
    ) -> None:
        """post result of calculation with a default method of appending"""
        if self._is_data_ready:
            self.result = result
            self.table_name = table_name or f"categories_index_{self.site}"
            self.result = self.result.loc[
                :,
                [
                    "category_id",
                    "maincategory",
                    "site",
                    "amv",
                    "cii",
                    "cci",
                    "pqi",
                    "nvpi",
                    "cvr",
                    "avr",
                    "asr",
                    "csr",
                ],
            ]
            self.result.drop_duplicates("category_id", inplace=True)
            self.result["remark_used"] = pd.to_datetime(
                self._remark[-10:], utc=False
            ).date()
            self.result["remark_used"] = pd.to_datetime(self.result["remark_used"])
            self.result["created_at"] = pd.to_datetime(self._nowtime, utc=False).date()
            self.result["created_at"] = pd.to_datetime(self.result["created_at"])
            self.outputdict = self.set_column_types(self.result)

            with self.db_engine.connect() as conn:
                self.result.to_sql(
                    f"{self.table_name}",
                    if_exists = method,
                    con = conn,
                    method = "multi",
                    index = False,
                    dtype = self.outputdict,
                )
                if method == "replace":
                    conn.execute(
                        f"""alter table math_model.{self.table_name} add primary key(category_id, remark_used);"""
                    )
                for column, dtype in self.outputdict.items():
                    set_comment = f"""alter table math_model.{self.table_name} modify {column} {dtype} comment '{self.db_columns_comments[column]}';"""
                    conn.execute(set_comment)
            logger.info(f"已存入 math_model.{self.table_name}")
        else:
            logger.info(f"{self._remark} is nearer than {self._last_update_date}")

### main方法
直接执行indexcalculator.py文件将会激活main方法，以各站点为site参数循环创建Indexcalculator并执行calculate方法。

In [None]:
async def main() -> Optional[DataFrame]:
    calculators = [IndexCalculator(f"{country}") for country in countries]
    for calculator in calculators:
        logger.info(f"now calculating {calculator.site} site data")
        await calculator.calculate(postdb=True)
        logger.info(f"{calculator.site} index calculation finished)")
    logger.info("All site index calculation finished")

countries = ["us", "uk", "de", "it", "jp", "fr", "es"]

if __name__ == "__main__":
    logger.info("start running indexes calculation of all sites")
    asyncio.run(main())

# uscalculator = build_calculator(IndexCalculator('us'))
uscalculator = IndexCalculator('us')
