<a href="https://colab.research.google.com/github/lxradda/1scm1x/blob/main/untitled3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# requirements.txt
web3==6.12.0
pandas==2.0.3
scikit-learn==1.3.0
gradio==3.45.0
requests==2.31.0
python-dotenv==1.0.0
joblib==1.3.2
numpy==1.24.3
tweepy==4.14.0
beautifulsoup4==4.12.2

# .env (√áevresel deƒüi≈ükenler)
INFURA_PROJECT_ID=your_infura_project_id
ETHERSCAN_API_KEY=your_etherscan_api_key
TWITTER_BEARER_TOKEN=your_twitter_bearer_token

# config.py
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    INFURA_PROJECT_ID = os.getenv('INFURA_PROJECT_ID')
    ETHERSCAN_API_KEY = os.getenv('ETHERSCAN_API_KEY')
    TWITTER_BEARER_TOKEN = os.getenv('TWITTER_BEARER_TOKEN')
    INFURA_URL = f"https://mainnet.infura.io/v3/{INFURA_PROJECT_ID}"

# utils/onchain_checker.py
import requests
from web3 import Web3
from config import Config
import json
import time

class OnchainChecker:
    def __init__(self):
        self.w3 = Web3(Web3.HTTPProvider(Config.INFURA_URL))
        self.etherscan_api = Config.ETHERSCAN_API_KEY

    def get_contract_features(self, address):
        try:
            # Kontrat adresini doƒürula
            if not self.w3.is_address(address):
                raise ValueError("Ge√ßersiz kontrat adresi")

            address = self.w3.to_checksum_address(address)

            # Token bilgilerini al
            token_info = self._get_token_info(address)
            holder_data = self._get_holder_distribution(address)
            contract_code = self._get_contract_source(address)
            liquidity_info = self._check_liquidity_lock(address)

            return {
                "holder_concentration": self._calculate_concentration(holder_data),
                "mint_function": self._check_mint_function(contract_code),
                "lp_locked": liquidity_info.get("locked", False),
                "max_tax": self._extract_max_tax(contract_code),
                "owner_renounced": self._check_ownership_renounced(address),
                "contract_verified": contract_code.get("verified", False),
                "creation_date": self._get_creation_date(address),
                "total_supply": token_info.get("total_supply", 0),
                "holder_count": len(holder_data)
            }
        except Exception as e:
            print(f"Hata: {e}")
            return self._get_default_features()

    def _get_token_info(self, address):
        """Token temel bilgilerini al"""
        try:
            # ERC-20 ABI (sadece gerekli fonksiyonlar)
            erc20_abi = [
                {
                    "constant": True,
                    "inputs": [],
                    "name": "totalSupply",
                    "outputs": [{"name": "", "type": "uint256"}],
                    "type": "function"
                }
            ]

            contract = self.w3.eth.contract(address=address, abi=erc20_abi)
            total_supply = contract.functions.totalSupply().call()

            return {"total_supply": total_supply}
        except:
            return {"total_supply": 0}

    def _get_holder_distribution(self, address):
        """Holder daƒüƒ±lƒ±mƒ±nƒ± Etherscan'den al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "token",
                "action": "tokenholderlist",
                "contractaddress": address,
                "page": 1,
                "offset": 100,
                "apikey": self.etherscan_api
            }

            response = requests.get(url, params=params, timeout=10)
            data = response.json()

            if data.get("status") == "1":
                return data.get("result", [])
            return []
        except:
            return []

    def _calculate_concentration(self, holders):
        """Top 10 holder konsantrasyonunu hesapla"""
        if not holders:
            return 100  # Veri yoksa risk y√ºksek

        try:
            # ƒ∞lk 10 holder'ƒ±n toplam token y√ºzdesi
            top_10_balance = sum(float(h.get("TokenHolderQuantity", 0)) for h in holders[:10])
            total_supply = sum(float(h.get("TokenHolderQuantity", 0)) for h in holders)

            if total_supply == 0:
                return 100

            concentration = (top_10_balance / total_supply) * 100
            return min(concentration, 100)  # Max %100
        except:
            return 75  # Default deƒüer

    def _get_contract_source(self, address):
        """Kontrat kaynak kodunu al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "contract",
                "action": "getsourcecode",
                "address": address,
                "apikey": self.etherscan_api
            }

            response = requests.get(url, params=params, timeout=10)
            data = response.json()

            if data.get("status") == "1" and data.get("result"):
                source_code = data["result"][0].get("SourceCode", "")
                return {
                    "source_code": source_code,
                    "verified": len(source_code) > 0
                }
            return {"source_code": "", "verified": False}
        except:
            return {"source_code": "", "verified": False}

    def _check_mint_function(self, contract_data):
        """Mint fonksiyonu var mƒ± kontrol et"""
        source_code = contract_data.get("source_code", "").lower()
        mint_keywords = ["function mint", "mint(", "_mint(", "mintto", "mint_"]
        return any(keyword in source_code for keyword in mint_keywords)

    def _extract_max_tax(self, contract_data):
        """Maksimum tax oranƒ±nƒ± bulmaya √ßalƒ±≈ü"""
        source_code = contract_data.get("source_code", "").lower()

        # Yaygƒ±n tax variable isimleri
        tax_patterns = ["buytax", "selltax", "tax", "fee"]

        import re
        for pattern in tax_patterns:
            # uint256 public buyTax = 5; gibi patternleri ara
            regex = rf"{pattern}\s*=\s*(\d+)"
            matches = re.findall(regex, source_code)
            if matches:
                return max(int(match) for match in matches)

        return 10  # Default deƒüer

    def _check_ownership_renounced(self, address):
        """Owner renounce edilmi≈ü mi kontrol et"""
        try:
            # Owner fonksiyonu √ßaƒüƒ±r
            owner_abi = [{
                "constant": True,
                "inputs": [],
                "name": "owner",
                "outputs": [{"name": "", "type": "address"}],
                "type": "function"
            }]

            contract = self.w3.eth.contract(address=address, abi=owner_abi)
            owner = contract.functions.owner().call()

            # 0x000...000 adresine sahipse renounce edilmi≈ü
            return owner == "0x0000000000000000000000000000000000000000"
        except:
            return False

    def _check_liquidity_lock(self, address):
        """Likidite kilidi kontrol et (basit versiyon)"""
        # Bu ger√ßek implementasyonda Uniswap/PancakeSwap pool kontratlarƒ±nƒ± kontrol etmeli
        return {"locked": False, "lock_time": 0}

    def _get_creation_date(self, address):
        """Kontrat olu≈üturulma tarihini al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "account",
                "action": "txlist",
                "address": address,
                "startblock": 0,
                "endblock": 99999999,
                "page": 1,
                "offset": 1,
                "sort": "asc",
                "apikey": self.etherscan_api
            }

            response = requests.get(url, params=params, timeout=10)
            data = response.json()

            if data.get("status") == "1" and data.get("result"):
                timestamp = int(data["result"][0].get("timeStamp", 0))
                return timestamp
            return 0
        except:
            return 0

    def _get_default_features(self):
        """Hata durumunda default deƒüerler"""
        return {
            "holder_concentration": 75,
            "mint_function": True,
            "lp_locked": False,
            "max_tax": 25,
            "owner_renounced": False,
            "contract_verified": False,
            "creation_date": 0,
            "total_supply": 0,
            "holder_count": 0
        }

# utils/social_analysis.py
import requests
import tweepy
from config import Config
from datetime import datetime, timedelta

class SocialAnalyzer:
    def __init__(self):
        if Config.TWITTER_BEARER_TOKEN:
            self.twitter_client = tweepy.Client(bearer_token=Config.TWITTER_BEARER_TOKEN)
        else:
            self.twitter_client = None

    def analyze_social_signals(self, token_name, contract_address):
        """Token i√ßin sosyal medya sinyallerini analiz et"""
        signals = {
            "twitter_mentions": 0,
            "sentiment_score": 0.5,  # 0-1 arasƒ±
            "bot_activity": False,
            "pump_dump_keywords": False
        }

        if self.twitter_client:
            try:
                # Son 24 saat i√ßinde token hakkƒ±nda tweetleri ara
                query = f"{token_name} OR {contract_address[:10]} -is:retweet lang:en"
                tweets = self.twitter_client.search_recent_tweets(
                    query=query,
                    max_results=50,
                    tweet_fields=["created_at", "author_id", "public_metrics"]
                )

                if tweets.data:
                    signals["twitter_mentions"] = len(tweets.data)
                    signals["pump_dump_keywords"] = self._check_pump_dump_keywords(tweets.data)
                    signals["bot_activity"] = self._detect_bot_activity(tweets.data)
                    signals["sentiment_score"] = self._calculate_sentiment(tweets.data)

            except Exception as e:
                print(f"Twitter API hatasƒ±: {e}")

        return signals

    def _check_pump_dump_keywords(self, tweets):
        """Pump&dump anahtar kelimelerini kontrol et"""
        pump_keywords = [
            "moon", "üöÄ", "pump", "easy money", "quick profit",
            "100x", "1000x", "get rich", "diamond hands", "ape in"
        ]

        tweet_texts = " ".join([tweet.text.lower() for tweet in tweets])
        return any(keyword in tweet_texts for keyword in pump_keywords)

    def _detect_bot_activity(self, tweets):
        """Bot aktivitesi tespit et"""
        if len(tweets) < 10:
            return False

        # Aynƒ± metinlerin tekrarƒ±
        tweet_texts = [tweet.text for tweet in tweets]
        unique_tweets = set(tweet_texts)

        # %70'den fazlasƒ± aynƒ±ysa bot aktivitesi olabilir
        return len(unique_tweets) / len(tweet_texts) < 0.3

    def _calculate_sentiment(self, tweets):
        """Basit sentiment analizi"""
        positive_words = ["good", "great", "amazing", "bullish", "buy"]
        negative_words = ["scam", "rug", "fake", "avoid", "dump"]

        total_score = 0
        for tweet in tweets:
            text = tweet.text.lower()
            pos_count = sum(word in text for word in positive_words)
            neg_count = sum(word in text for word in negative_words)

            if pos_count + neg_count > 0:
                total_score += pos_count / (pos_count + neg_count)
            else:
                total_score += 0.5  # N√∂tr

        return total_score / len(tweets) if tweets else 0.5

# utils/code_analysis.py
import re

class CodeAnalyzer:
    def __init__(self):
        self.risk_patterns = {
            "mint_function": [
                r"function\s+mint\s*\(",
                r"function\s+_mint\s*\(",
                r"\.mint\s*\(",
                r"mintTo\s*\("
            ],
            "ownership_issues": [
                r"onlyOwner",
                r"_owner\s*=",
                r"transferOwnership",
                r"renounceOwnership"
            ],
            "liquidity_removal": [
                r"removeLiquidity",
                r"withdraw.*Liquidity",
                r"emergencyWithdraw"
            ],
            "tax_functions": [
                r"buyTax\s*=",
                r"sellTax\s*=",
                r"setTax",
                r"updateTax"
            ],
            "blacklist_functions": [
                r"blacklist",
                r"addBot",
                r"removeBot",
                r"setBot"
            ],
            "pause_functions": [
                r"pause\s*\(",
                r"unpause\s*\(",
                r"setPaused"
            ]
        }

    def analyze_contract_code(self, address, source_code=""):
        """Kontrat kodunu analiz ederek risk bayraklarƒ±nƒ± d√∂nd√ºr"""
        flags = []

        if not source_code:
            flags.append("‚ö†Ô∏è Kontrat kodu doƒürulanamadƒ±")
            return flags

        source_code = source_code.lower()

        # Mint fonksiyonu kontrol√º
        if self._check_pattern_exists(source_code, self.risk_patterns["mint_function"]):
            flags.append("üî¥ Mint fonksiyonu tespit edildi")

        # Ownership kontrolleri
        if self._check_pattern_exists(source_code, self.risk_patterns["ownership_issues"]):
            if not self._check_ownership_renounced(source_code):
                flags.append("üî¥ Owner haklarƒ± korunuyor")

        # Likidite √ßekme fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["liquidity_removal"]):
            flags.append("üî¥ Likidite √ßekme fonksiyonu var")

        # Vergi fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["tax_functions"]):
            tax_rate = self._extract_tax_rate(source_code)
            if tax_rate > 10:
                flags.append(f"üî¥ Y√ºksek vergi oranƒ±: %{tax_rate}")

        # Blacklist fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["blacklist_functions"]):
            flags.append("üî¥ Blacklist fonksiyonu tespit edildi")

        # Pause fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["pause_functions"]):
            flags.append("üî¥ Trading durdurma fonksiyonu var")

        # Honeypot kontrolleri
        if self._check_honeypot_patterns(source_code):
            flags.append("üî¥ Honeypot kalƒ±plarƒ± tespit edildi")

        if not flags:
            flags.append("‚úÖ Kod analizi temiz")

        return flags

    def _check_pattern_exists(self, code, patterns):
        """Verilen pattern'lerden herhangi biri var mƒ± kontrol et"""
        for pattern in patterns:
            if re.search(pattern, code, re.IGNORECASE):
                return True
        return False

    def _check_ownership_renounced(self, code):
        """Ownership renounce edilmi≈ü mi kontrol et"""
        renounce_patterns = [
            r"renounceownership\s*\(\s*\)",
            r"_owner\s*=.*0x0+",
            r"owner.*=.*address\(0\)"
        ]
        return self._check_pattern_exists(code, renounce_patterns)

    def _extract_tax_rate(self, code):
        """Kod i√ßinden vergi oranƒ±nƒ± √ßƒ±karma"""
        tax_patterns = [
            r"buytax\s*=\s*(\d+)",
            r"selltax\s*=\s*(\d+)",
            r"tax.*=\s*(\d+)"
        ]

        max_tax = 0
        for pattern in tax_patterns:
            matches = re.findall(pattern, code, re.IGNORECASE)
            for match in matches:
                max_tax = max(max_tax, int(match))

        return max_tax

    def _check_honeypot_patterns(self, code):
        """Honeypot kalƒ±plarƒ±nƒ± kontrol et"""
        honeypot_patterns = [
            r"require.*balances\[.*\]\s*>=",  # Balance kontrol√º
            r"if.*balances\[.*\].*return false",  # Transfer engelleme
            r"mapping.*bool.*isexcluded",  # Exclusion mapping
            r"tradingopen\s*=\s*false"  # Trading kapalƒ±
        ]
        return self._check_pattern_exists(code, honeypot_patterns)

# utils/feature_engineering.py
import numpy as np
from datetime import datetime

class FeatureEngineer:
    def build_features(self, onchain_data, social_data=None):
        """Ham verilerden ML modeli i√ßin √∂zellikler olu≈ütur"""

        # Temel √∂zellikler
        features = [
            onchain_data.get("holder_concentration", 75),
            int(onchain_data.get("mint_function", True)),
            int(onchain_data.get("lp_locked", False)),
            onchain_data.get("max_tax", 25),
            int(onchain_data.get("owner_renounced", False)),
            int(onchain_data.get("contract_verified", False))
        ]

        # Kontrat ya≈üƒ± (g√ºn cinsinden)
        creation_date = onchain_data.get("creation_date", 0)
        if creation_date > 0:
            age_days = (datetime.now().timestamp() - creation_date) / 86400
            features.append(min(age_days, 365))  # Max 1 yƒ±l
        else:
            features.append(0)

        # Holder sayƒ±sƒ± (log √∂l√ßeƒüi)
        holder_count = onchain_data.get("holder_count", 1)
        features.append(np.log10(max(holder_count, 1)))

        # Sosyal medya √∂zellikleri
        if social_data:
            features.extend([
                social_data.get("twitter_mentions", 0),
                social_data.get("sentiment_score", 0.5) * 100,  # 0-100 √∂l√ßeƒüi
                int(social_data.get("bot_activity", False)),
                int(social_data.get("pump_dump_keywords", False))
            ])
        else:
            features.extend([0, 50, 0, 0])  # Default deƒüerler

        return np.array(features)

    def get_feature_names(self):
        """√ñzellik isimlerini d√∂nd√ºr"""
        return [
            "holder_concentration",
            "mint_function",
            "lp_locked",
            "max_tax",
            "owner_renounced",
            "contract_verified",
            "age_days",
            "log_holder_count",
            "twitter_mentions",
            "sentiment_score",
            "bot_activity",
            "pump_dump_keywords"
        ]

SyntaxError: invalid syntax (<ipython-input-1-41716b11d9ef>, line 2)

In [None]:
!pip install web3==6.12.0 pandas==2.0.3 scikit-learn==1.3.0 gradio==3.45.0 requests==2.31.0 python-dotenv==1.0.0 joblib==1.3.2 numpy==1.24.3 tweepy==4.14.0 beautifulsoup4==4.12.2



# Task
Integrate the provided Python utility files (`config.py`, `utils/onchain_checker.py`, `utils/social_analyzer.py`, `utils/code_analyzer.py`, `utils/feature_engineer.py`) into a `main.py` file. Create a Gradio interface in `main.py` that takes a contract address as input, uses the utility classes to perform analysis and feature engineering, and displays the results along with a placeholder risk prediction. Provide instructions on how to run the Gradio application.

## Create `main.py`

### Subtask:
Create a new Python file named `main.py` that will orchestrate the data collection, analysis, and feature engineering using the classes defined in the utility files.


**Reasoning**:
The first step is to create the `main.py` file and add the necessary imports and a placeholder function to structure the code.



In [None]:
# main.py
import gradio as gr
import numpy as np
import pandas as pd
from config import Config
from utils.onchain_checker import OnchainChecker
from utils.social_analysis import SocialAnalyzer
from utils.code_analysis import CodeAnalyzer
from utils.feature_engineering import FeatureEngineer

def analyze_contract(contract_address):
    """
    Analyzes a given contract address using the utility classes.

    Args:
        contract_address (str): The blockchain contract address.

    Returns:
        tuple: A tuple containing the analysis results and features.
    """
    # Placeholder for analysis logic
    return "Analysis results will appear here.", "Features will be listed here."

if __name__ == "__main__":
    # Placeholder for Gradio interface
    pass

ModuleNotFoundError: No module named 'config'

**Reasoning**:
The previous command failed because the files `config.py` and the files in the `utils` directory were not created in the notebook environment. I need to create these files before attempting to import from them. I will recreate the content of these files in separate code blocks and then attempt to create `main.py` again.



In [None]:
%%writefile config.py
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    INFURA_PROJECT_ID = os.getenv('INFURA_PROJECT_ID')
    ETHERSCAN_API_KEY = os.getenv('ETHERSCAN_API_KEY')
    TWITTER_BEARER_TOKEN = os.getenv('TWITTER_BEARER_TOKEN')
    INFURA_URL = f"https://mainnet.infura.io/v3/{INFURA_PROJECT_ID}"

Writing config.py


**Reasoning**:
Now that `config.py` is created, I need to create the `utils` directory and the Python files within it before I can create `main.py`.



In [None]:
# main.py
from utils.onchain_checker import OnchainChecker
from utils.social_analysis import SocialAnalyzer
from utils.code_analysis import CodeAnalyzer
from utils.feature_engineering import FeatureEngineer
import joblib
import os
import numpy as np
from datetime import datetime

class ScamDetector:
    def __init__(self):
        self.onchain_checker = OnchainChecker()
        self.social_analyzer = SocialAnalyzer()
        self.code_analyzer = CodeAnalyzer()
        self.feature_engineer = FeatureEngineer()

        # Modeli y√ºkle
        model_path = "models/scam_model.pkl"
        if os.path.exists(model_path):
            self.model = joblib.load(model_path)
        else:
            print("‚ö†Ô∏è Model bulunamadƒ±. √ñnce modeli eƒüitmeniz gerekiyor.")
            self.model = None

    def analyze_contract(self, contract_address, token_name=""):
        """Ana analiz fonksiyonu"""
        print(f"üîç Analiz ba≈ülƒ±yor: {contract_address}")

        try:
            # 1. Onchain verilerini al
            print("üìä Onchain veriler alƒ±nƒ±yor...")
            onchain_features = self.onchain_checker.get_contract_features(contract_address)

            # 2. Sosyal medya analizi (opsiyonel)
            social_features = None
            if token_name:
                print("üê¶ Sosyal medya analizi yapƒ±lƒ±yor...")
                social_features = self.social_analyzer.analyze_social_signals(token_name, contract_address)

            # 3. Kod analizi
            print("üíª Kontrat kodu analiz ediliyor...")
            source_code = self.onchain_checker._get_contract_source(contract_address).get("source_code", "")
            code_flags = self.code_analyzer.analyze_contract_code(contract_address, source_code)

            # 4. ML √∂zelliklerini olu≈ütur
            features = self.feature_engineer.build_features(onchain_features, social_features)

            # 5. Risk skorunu hesapla
            risk_score = 50  # Default
            ml_prediction = "Model bulunamadƒ±"

            if self.model is not None:
                try:
                    risk_probability = self.model.predict_proba([features])[0][1]
                    risk_score = round(risk_probability * 100, 2)
                    ml_prediction = "Scam" if risk_probability > 0.7 else "G√ºvenli" if risk_probability < 0.3 else "Dikkatli ol"
                except Exception as e:
                    print(f"Model tahmin hatasƒ±: {e}")

            # 6. Sonu√ßlarƒ± birle≈ütir
            result = {
                "contract": contract_address,
                "token_name": token_name,
                "risk_score": risk_score,
                "ml_prediction": ml_prediction,
                "red_flags": code_flags,
                "onchain_data": onchain_features,
                "social_data": social_features,
                "analysis_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "recommendations": self._generate_recommendations(risk_score, onchain_features, code_flags)
            }

            return result

        except Exception as e:
            print(f"‚ùå Analiz hatasƒ±: {e}")
            return {
                "contract": contract_address,
                "risk_score": 100,  # Hata durumunda y√ºksek risk
                "ml_prediction": "Analiz edilemedi",
                "red_flags": [f"‚ùå Analiz hatasƒ±: {str(e)}"],
                "error": str(e)
            }

    def _generate_recommendations(self, risk_score, onchain_data, code_flags):
        """Risk skoruna g√∂re √∂neriler olu≈ütur"""
        recommendations = []

        if risk_score >= 80:
            recommendations.append("üö® Y√úK¬≠SEK Rƒ∞SK: Bu token'a yatƒ±rƒ±m yapmayƒ±n!")
            recommendations.append("üí° Ba≈üka projeleri ara≈ütƒ±rƒ±n")
        elif risk_score >= 60:
            recommendations.append("‚ö†Ô∏è ORTA Rƒ∞SK: √áok dikkatli olun")
            recommendations.append("üí° K√º√ß√ºk miktarla test edin")
            recommendations.append("üí° Exit stratejinizi belirleyin")
        elif risk_score >= 40:
            recommendations.append("üü° D√ú¬≠≈û√úK-ORTA Rƒ∞SK: Ara≈ütƒ±rma yapƒ±n")
            recommendations.append("üí° Topluluk ve geli≈ütirici aktivitesini kontrol edin")
        else:
            recommendations.append("‚úÖ D√ú¬≠≈û√úK Rƒ∞SK: G√∂rece g√ºvenli g√∂r√ºn√ºyor")
            recommendations.append("üí° Yine de kendi ara≈ütƒ±rmanƒ±zƒ± yapƒ±n")

        # √ñzel durumlar i√ßin √∂neriler
        if not onchain_data.get("contract_verified", False):
            recommendations.append("üí° Kontrat kodu doƒürulanmamƒ±≈ü - ekstra dikkat")

        if onchain_data.get("holder_concentration", 0) > 50:
            recommendations.append("üí° Token daƒüƒ±lƒ±mƒ± centralized - whale riski")

        if not onchain_data.get("owner_renounced", False):
            recommendations.append("üí° Owner haklarƒ± hala aktif - kontrol riski")

        return recommendations

def main():
    """Test fonksiyonu"""
    detector = ScamDetector()

    # Test adresi (√∂rnek)
    test_address = "0xA0b86991c31cB32c05C6f5F1B0a5b4C2a5d4c0a6"  # USDC

    result = detector.analyze_contract(test_address, "USDC")

    print("\n" + "="*50)
    print("ü§ñ SCAM BOT ANALYSIS SONU√áLARI")
    print("="*50)
    print(f"üìß Kontrat: {result['contract']}")
    print(f"üéØ Risk Skoru: {result['risk_score']}%")
    print(f"ü§ñ ML Tahmini: {result['ml_prediction']}")
    print(f"üìÖ Analiz Zamanƒ±: {result.get('analysis_time', 'Bilinmiyor')}")

    print(f"\nüö© Risk Bayraklarƒ±:")
    for flag in result['red_flags']:
        print(f"  {flag}")

    print(f"\nüí° √ñneriler:")
    for rec in result.get('recommendations', []):
        print(f"  {rec}")

    if result.get('onchain_data'):
        print(f"\nüìä Onchain Veriler:")
        data = result['onchain_data']
        print(f"  ‚Ä¢ Holder Konsantrasyonu: %{data.get('holder_concentration', 0):.1f}")
        print(f"  ‚Ä¢ Mint Fonksiyonu: {'Var' if data.get('mint_function') else 'Yok'}")
        print(f"  ‚Ä¢ LP Kilidi: {'Var' if data.get('lp_locked') else 'Yok'}")
        print(f"  ‚Ä¢ Maksimum Tax: %{data.get('max_tax', 0)}")
        print(f"  ‚Ä¢ Owner Renounced: {'Evet' if data.get('owner_renounced') else 'Hayƒ±r'}")
        print(f"  ‚Ä¢ Kontrat Doƒürulanmƒ±≈ü: {'Evet' if data.get('contract_verified') else 'Hayƒ±r'}")

if __name__ == "__main__":
    main()

ModuleNotFoundError: No module named 'utils'

In [None]:
# app/gradio_interface.py
import gradio as gr
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from main import ScamDetector
import json

class ScamBotInterface:
    def __init__(self):
        self.detector = ScamDetector()

    def analyze_token(self, contract_address, token_name="", include_social=False):
        """Gradio interface i√ßin analiz fonksiyonu"""

        if not contract_address.strip():
            return "

SyntaxError: unterminated string literal (detected at line 18) (<ipython-input-4-9cb65735d2bb>, line 18)

In [None]:
!mkdir utils

In [None]:
%%writefile utils/onchain_checker.py
import requests
from web3 import Web3
from config import Config
import json
import time

class OnchainChecker:
    def __init__(self):
        self.w3 = Web3(Web3.HTTPProvider(Config.INFURA_URL))
        self.etherscan_api = Config.ETHERSCAN_API_KEY
        # Check Infura connection
        if not self.w3.is_connected():
            print("‚ùå Infura baƒülantƒ±sƒ± kurulamadƒ±! INFURA_PROJECT_ID'nizi kontrol edin.")
        else:
            print("‚úÖ Infura baƒülantƒ±sƒ± ba≈üarƒ±lƒ±.")


    def get_contract_features(self, address):
        try:
            # Kontrat adresini doƒürula
            if not self.w3.is_address(address):
                raise ValueError("Ge√ßersiz kontrat adresi")

            address = self.w3.to_checksum_address(address)
            print(f"üîç Analiz edilen adres (checksum): {address}")


            # Token bilgilerini al
            print("-> Token bilgileri alƒ±nƒ±yor...")
            token_info = self._get_token_info(address)
            print(f"<- Token bilgileri: {token_info}")

            print("-> Holder daƒüƒ±lƒ±mƒ± alƒ±nƒ±yor...")
            holder_data = self._get_holder_distribution(address)
            print(f"<- Holder sayƒ±sƒ±: {len(holder_data)}")


            print("-> Kontrat kaynak kodu alƒ±nƒ±yor...")
            contract_code = self._get_contract_source(address)
            print(f"<- Kontrat doƒürulanmƒ±≈ü: {contract_code.get('verified', False)}")


            print("-> Likidite kilidi kontrol ediliyor...")
            liquidity_info = self._check_liquidity_lock(address)
            print(f"<- LP kilitli: {liquidity_info.get('locked', False)}")


            print("-> Owner renounce kontrol ediliyor...")
            owner_renounced = self._check_ownership_renounced(address)
            print(f"<- Owner renounce edilmi≈ü: {owner_renounced}")


            print("-> Kontrat olu≈üturulma tarihi alƒ±nƒ±yor...")
            creation_date = self._get_creation_date(address)
            print(f"<- Olu≈üturulma tarihi (timestamp): {creation_date}")


            return {
                "holder_concentration": self._calculate_concentration(holder_data),
                "mint_function": self._check_mint_function(contract_code),
                "lp_locked": liquidity_info.get("locked", False),
                "max_tax": self._extract_max_tax(contract_code),
                "owner_renounced": owner_renounced,
                "contract_verified": contract_code.get("verified", False),
                "creation_date": creation_date,
                "total_supply": token_info.get("total_supply", 0),
                "holder_count": len(holder_data)
            }
        except Exception as e:
            print(f"‚ùå OnchainChecker hatasƒ±: {e}")
            return self._get_default_features()

    def _get_token_info(self, address):
        """Token temel bilgilerini al"""
        try:
            # ERC-20 ABI (sadece gerekli fonksiyonlar)
            erc20_abi = [
                {
                    "constant": True,
                    "inputs": [],
                    "name": "totalSupply",
                    "outputs": [{"name": "", "type": "uint256"}],
                    "type": "function"
                }
            ]

            contract = self.w3.eth.contract(address=address, abi=erc20_abi)
            total_supply = contract.functions.totalSupply().call()

            return {"total_supply": total_supply}
        except Exception as e:
            print(f"  _get_token_info hatasƒ±: {e}")
            return {"total_supply": 0}

    def _get_holder_distribution(self, address):
        """Holder daƒüƒ±lƒ±mƒ±nƒ± Etherscan'den al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "token",
                "action": "tokenholderlist",
                "contractaddress": address,
                "page": 1,
                "offset": 100,
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            print(f"  Etherscan API √ßaƒürƒ±sƒ± (tokenholderlist): {url}?module=token&action=tokenholderlist&contractaddress={address}&page=1&offset=100&apikey=...")

            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            print(f"  Etherscan API yanƒ±tƒ± (tokenholderlist status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1":
                return data.get("result", [])
            else:
                print(f"  Etherscan API'den holder bilgisi alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                return []
        except requests.exceptions.RequestException as e:
            print(f"  _get_holder_distribution istek hatasƒ±: {e}")
            return []
        except Exception as e:
            print(f"  _get_holder_distribution genel hata: {e}")
            return []


    def _calculate_concentration(self, holders):
        """Top 10 holder konsantrasyonunu hesapla"""
        if not holders:
            return 100  # Veri yoksa risk y√ºksek

        try:
            # ƒ∞lk 10 holder'ƒ±n toplam token y√ºzdesi
            # TokenHolderQuantity string gelebilir, float'a √ßevirirken hata olabilir
            top_10_balance = 0
            for h in holders[:10]:
                try:
                    top_10_balance += float(h.get("TokenHolderQuantity", 0))
                except ValueError:
                    print(f"  _calculate_concentration ValueError: {h.get('TokenHolderQuantity')} float'a √ßevrilemedi.")
                    continue # Hatalƒ± veriyi atla

            total_supply = 0
            for h in holders:
                 try:
                    total_supply += float(h.get("TokenHolderQuantity", 0))
                 except ValueError:
                     print(f"  _calculate_concentration ValueError: {h.get('TokenHolderQuantity')} float'a √ßevrilemedi.")
                     continue # Hatalƒ± veriyi atla


            if total_supply == 0:
                return 100

            concentration = (top_10_balance / total_supply) * 100
            return min(concentration, 100)  # Max %100
        except Exception as e:
            print(f"  _calculate_concentration genel hata: {e}")
            return 75  # Default deƒüer

    def _get_contract_source(self, address):
        """Kontrat kaynak kodunu al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "contract",
                "action": "getsourcecode",
                "address": address,
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            print(f"  Etherscan API √ßaƒürƒ±sƒ± (getsourcecode): {url}?module=contract&action=getsourcecode&address={address}&apikey=...")


            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            print(f"  Etherscan API yanƒ±tƒ± (getsourcecode status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1" and data.get("result"):
                source_code = data["result"][0].get("SourceCode", "")
                return {
                    "source_code": source_code,
                    "verified": len(source_code) > 0 and source_code != "Contract source code not verified" # Doƒürulama mesajƒ±nƒ± da kontrol et
                }
            else:
                 print(f"  Etherscan API'den kaynak kodu alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                 return {"source_code": "", "verified": False}

        except requests.exceptions.RequestException as e:
            print(f"  _get_contract_source istek hatasƒ±: {e}")
            return {"source_code": "", "verified": False}
        except Exception as e:
            print(f"  _get_contract_source genel hata: {e}")
            return {"source_code": "", "verified": False}


    def _check_mint_function(self, contract_data):
        """Mint fonksiyonu var mƒ± kontrol et"""
        source_code = contract_data.get("source_code", "").lower()
        mint_keywords = ["function mint", "mint(", "_mint(", "mintto", "mint_"]
        return any(keyword in source_code for keyword in mint_keywords)

    def _extract_max_tax(self, contract_data):
        """Maksimum tax oranƒ±nƒ± bulmaya √ßalƒ±≈ü"""
        source_code = contract_data.get("source_code", "").lower()

        # Yaygƒ±n tax variable isimleri
        tax_patterns = ["buytax", "selltax", "tax", "fee"]

        import re
        max_tax = 0
        for pattern in tax_patterns:
            # uint256 public buyTax = 5; gibi patternleri ara
            regex = rf"{pattern}\s*=\s*(\d+)"
            matches = re.findall(regex, source_code)
            for match in matches:
                 try:
                     max_tax = max(max_tax, int(match))
                 except ValueError:
                     print(f"  _extract_max_tax ValueError: {match} int'e √ßevrilemedi.")
                     continue # Hatalƒ± veriyi atla

        return max_tax # Default deƒüer


    def _check_ownership_renounced(self, address):
        """Owner renounce edilmi≈ü mi kontrol et"""
        try:
            # Owner fonksiyonu √ßaƒüƒ±r
            owner_abi = [{
                "constant": True,
                "inputs": [],
                "name": "owner",
                "outputs": [{"name": "", "type": "address"}],
                "type": "function"
            }]

            contract = self.w3.eth.contract(address=address, abi=owner_abi)
            owner = contract.functions.owner().call()

            # 0x000...000 adresine sahipse renounce edilmi≈ü
            return owner == "0x0000000000000000000000000000000000000000"
        except Exception as e:
            print(f"  _check_ownership_renounced hatasƒ±: {e}")
            return False

    def _check_liquidity_lock(self, address):
        """Likidite kilidi kontrol et (basit versiyon)"""
        # Bu ger√ßek implementasyonda Uniswap/PancakeSwap pool kontratlarƒ±nƒ± kontrol etmeli
        # Etherscan API'sinde doƒürudan likidite kilidi bilgisi yok, bu daha karma≈üƒ±k bir analiz gerektirir.
        # ≈ûimdilik sabit deƒüer d√∂nd√ºr√ºyoruz.
        print("  _check_liquidity_lock: Basit kontrol, her zaman False d√∂nd√ºr√ºyor.")
        return {"locked": False, "lock_time": 0}

    def _get_creation_date(self, address):
        """Kontrat olu≈üturulma tarihini al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "account",
                "action": "txlist",
                "address": address,
                "startblock": 0,
                "endblock": 99999999,
                "page": 1,
                "offset": 1,
                "sort": "asc",
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            print(f"  Etherscan API √ßaƒürƒ±sƒ± (txlist): {url}?module=account&action=txlist&address={address}&startblock=0&endblock=99999999&page=1&offset=1&sort=asc&apikey=...")


            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            print(f"  Etherscan API yanƒ±tƒ± (txlist status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1" and data.get("result"):
                timestamp = int(data["result"][0].get("timeStamp", 0))
                return timestamp
            else:
                print(f"  Etherscan API'den olu≈üturulma tarihi alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                return 0
        except requests.exceptions.RequestException as e:
            print(f"  _get_creation_date istek hatasƒ±: {e}")
            return 0
        except Exception as e:
            print(f"  _get_creation_date genel hata: {e}")
            return 0


    def _get_default_features(self):
        """Hata durumunda default deƒüerler"""
        return {
            "holder_concentration": 75,
            "mint_function": True,
            "lp_locked": False,
            "max_tax": 25,
            "owner_renounced": False,
            "contract_verified": False,
            "creation_date": 0,
            "total_supply": 0,
            "holder_count": 0
        }

Writing utils/onchain_checker.py


In [None]:
%%writefile utils/social_analysis.py
import requests
import tweepy
from config import Config
from datetime import datetime, timedelta

class SocialAnalyzer:
    def __init__(self):
        if Config.TWITTER_BEARER_TOKEN:
            try:
                self.twitter_client = tweepy.Client(bearer_token=Config.TWITTER_BEARER_TOKEN)
                # Verify credentials - basic check
                # self.twitter_client.get_me() # Bu kotaya dahil olabilir, dikkatli kullanƒ±n
                print("‚úÖ Twitter istemcisi ba≈ülatƒ±ldƒ±.")
            except Exception as e:
                 print(f"‚ùå Twitter istemcisi ba≈ülatƒ±lƒ±rken hata olu≈ütu: {e}")
                 self.twitter_client = None
        else:
            print("‚ö†Ô∏è TWITTER_BEARER_TOKEN ayarlanmamƒ±≈ü. Sosyal medya analizi yapƒ±lamayacak.")
            self.twitter_client = None


    def analyze_social_signals(self, token_name, contract_address):
        """Token i√ßin sosyal medya sinyallerini analiz et"""
        signals = {
            "twitter_mentions": 0,
            "sentiment_score": 0.5,  # 0-1 arasƒ±
            "bot_activity": False,
            "pump_dump_keywords": False
        }

        if self.twitter_client:
            try:
                # Sorgu olu≈üturma - Hata ayƒ±klama i√ßin basƒ±ldƒ±
                # Tweet sorgusu i√ßin kurallar: https://developer.twitter.com/en/docs/twitter-api/tweets/search/integrate/build-a-query
                # Contract adresi yerine sadece token adƒ± veya kƒ±saltmasƒ± daha iyi olabilir
                # √áok kƒ±sa contract adresi par√ßasƒ± ([:10]) ge√ßersiz sorguya neden olabilir
                # Token adƒ± bo≈üsa veya √ßok kƒ±saysa da sorun olabilir.
                query_parts = []
                # Check if token_name is provided and is reasonably long
                if token_name and len(token_name.strip()) > 2: # Use token name if provided and > 2 chars
                    query_parts.append(f'"{token_name.strip()}"')
                # Add contract address only if token_name is not used or is short, and address is valid length
                # Using the full address in query might be too specific, maybe just the beginning?
                # Or perhaps only use contract address if no token name is given?
                # Let's prioritize token name, then fall back to part of address if name is not useful.
                elif contract_address and len(contract_address) > 10: # Check contract address length
                     query_parts.append(f'{contract_address[:10]}') # Use first 10 chars


                if not query_parts:
                     print("‚ö†Ô∏è Twitter sorgusu i√ßin yeterli bilgi (token adƒ± veya kontrat adresi) yok.")
                     return signals # Bo≈ü sinyaller d√∂nd√ºr

                # Combine query parts with OR, ensure no leading/trailing OR if only one part
                query = " OR ".join(query_parts)
                final_query = f"{query} -is:retweet lang:en" # Retweetleri hari√ß tut

                print(f"  Twitter API sorgusu: {final_query}")

                # Son 24 saat i√ßinde token hakkƒ±nda tweetleri ara
                # start_time = datetime.utcnow() - timedelta(hours=24)
                # start_time_str = start_time.isoformat("T") + "Z" # RFC3339 formatƒ±


                tweets = self.twitter_client.search_recent_tweets(
                    query=final_query,
                    max_results=10, # Test i√ßin daha az sonu√ß √ßek
                    # start_time=start_time_str, # Son 24 saat
                    tweet_fields=["created_at", "author_id", "public_metrics", "text"] # text'i de al
                )

                if tweets and tweets.data:
                    print(f"  Twitter API {len(tweets.data)} tweet buldu.")
                    signals["twitter_mentions"] = len(tweets.data)
                    signals["pump_dump_keywords"] = self._check_pump_dump_keywords(tweets.data)
                    signals["bot_activity"] = self._detect_bot_activity(tweets.data)
                    signals["sentiment_score"] = self._calculate_sentiment(tweets.data)
                else:
                    print("  Twitter API tweet bulamadƒ±.")
                    signals["twitter_mentions"] = 0


            except tweepy.errors.TweepyException as e:
                 print(f"‚ùå Twitter API hatasƒ± (TweepyException): {e}")
                 # Hata detaylarƒ±nƒ± yazdƒ±rma
                 if hasattr(e, 'response') and e.response is not None:
                     print(f"  HTTP Status Code: {e.response.status_code}")
                     try:
                         print(f"  Response Body: {e.response.json()}")
                     except:
                         print(f"  Response Body: {e.response.text}")

            except Exception as e:
                print(f"‚ùå Sosyal medya analizi genel hata: {e}")

        else:
            print("‚ö†Ô∏è Twitter istemcisi kullanƒ±lamƒ±yor. Sosyal medya analizi atlandƒ±.")


        return signals

    def _check_pump_dump_keywords(self, tweets):
        """Pump&dump anahtar kelimelerini kontrol et"""
        pump_keywords = [
            "moon", "üöÄ", "pump", "easy money", "quick profit",
            "100x", "1000x", "get rich", "diamond hands", "ape in"
        ]

        # tweets bir liste, her √∂ƒüe bir Tweet nesnesi
        tweet_texts = " ".join([tweet.text.lower() for tweet in tweets])
        return any(keyword in tweet_texts for keyword in pump_keywords)

    def _detect_bot_activity(self, tweets):
        """Bot aktivitesi tespit et"""
        if len(tweets) < 5: # Bot aktivitesi i√ßin minimum tweet sayƒ±sƒ±
            return False

        # Aynƒ± metinlerin tekrarƒ±
        tweet_texts = [tweet.text.strip() for tweet in tweets] # Ba≈üƒ±ndaki ve sonundaki bo≈üluklarƒ± kaldƒ±r
        unique_tweets = set(tweet_texts)

        # %50'den fazlasƒ± aynƒ±ysa bot aktivitesi olabilir (daha d√º≈ü√ºk e≈üik)
        is_bot = len(unique_tweets) / len(tweet_texts) < 0.5
        if is_bot:
            print("  Bot aktivitesi ≈ü√ºphesi: Tekrarlayan tweetler tespit edildi.")
        return is_bot


    def _calculate_sentiment(self, tweets):
        """Basit sentiment analizi"""
        positive_words = ["good", "great", "amazing", "bullish", "buy", "long", "holding", "hodl", "strong"]
        negative_words = ["scam", "rug", "fake", "avoid", "dump", "short", "sell", "warning", "risk"]

        total_score = 0
        if not tweets:
            return 0.5 # Tweet yoksa n√∂tr

        for tweet in tweets:
            text = tweet.text.lower()
            pos_count = sum(word in text for word in positive_words)
            neg_count = sum(word in text for word in negative_words)

            # Sentiment skorunu hesapla (basit oran)
            score = 0.5 # Default n√∂tr
            if pos_count > neg_count:
                score = 1.0 # Pozitif
            elif neg_count > pos_count:
                score = 0.0 # Negatif
            # Eƒüer pos_count == neg_count ise skor 0.5 kalƒ±r (n√∂tr)

            total_score += score

        # Ortalama sentiment
        average_sentiment = total_score / len(tweets)
        return average_sentiment

Writing utils/social_analysis.py


In [None]:
%%writefile utils/code_analysis.py
import re

class CodeAnalyzer:
    def __init__(self):
        self.risk_patterns = {
            "mint_function": [
                r"function\s+mint\s*\(",
                r"function\s+_mint\s*\(",
                r"\.mint\s*\(",
                r"mintTo\s*\("
            ],
            "ownership_issues": [
                r"onlyOwner",
                r"_owner\s*=",
                r"transferOwnership",
                r"renounceOwnership"
            ],
            "liquidity_removal": [
                r"removeLiquidity",
                r"withdraw.*Liquidity",
                r"emergencyWithdraw"
            ],
            "tax_functions": [
                r"buyTax\s*=",
                r"sellTax\s*=",
                r"setTax",
                r"updateTax"
            ],
            "blacklist_functions": [
                r"blacklist",
                r"addBot",
                r"removeBot",
                r"setBot"
            ],
            "pause_functions": [
                r"pause\s*\(",
                r"unpause\s*\(",
                r"setPaused"
            ]
        }

    def analyze_contract_code(self, address, source_code=""):
        """Kontrat kodunu analiz ederek risk bayraklarƒ±nƒ± d√∂nd√ºr"""
        flags = []

        if not source_code:
            flags.append("‚ö†Ô∏è Kontrat kodu doƒürulanamadƒ±")
            return flags

        source_code = source_code.lower()

        # Mint fonksiyonu kontrol√º
        if self._check_pattern_exists(source_code, self.risk_patterns["mint_function"]):
            flags.append("üî¥ Mint fonksiyonu tespit edildi")

        # Ownership kontrolleri
        if self._check_pattern_exists(source_code, self.risk_patterns["ownership_issues"]):
            if not self._check_ownership_renounced(source_code):
                flags.append("üî¥ Owner haklarƒ± korunuyor")

        # Likidite √ßekme fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["liquidity_removal"]):
            flags.append("üî¥ Likidite √ßekme fonksiyonu var")

        # Vergi fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["tax_functions"]):
            tax_rate = self._extract_tax_rate(source_code)
            if tax_rate > 10:
                flags.append(f"üî¥ Y√ºksek vergi oranƒ±: %{tax_rate}")

        # Blacklist fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["blacklist_functions"]):
            flags.append("üî¥ Blacklist fonksiyonu tespit edildi")

        # Pause fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["pause_functions"]):
            flags.append("üî¥ Trading durdurma fonksiyonu var")

        # Honeypot kontrolleri
        if self._check_honeypot_patterns(source_code):
            flags.append("üî¥ Honeypot kalƒ±plarƒ± tespit edildi")

        if not flags:
            flags.append("‚úÖ Kod analizi temiz")

        return flags

    def _check_pattern_exists(self, code, patterns):
        """Verilen pattern'lerden herhangi biri var mƒ± kontrol et"""
        for pattern in patterns:
            if re.search(pattern, code, re.IGNORECASE):
                return True
        return False

    def _check_ownership_renounced(self, code):
        """Ownership renounce edilmi≈ü mi kontrol et"""
        renounce_patterns = [
            r"renounceownership\s*\(\s*\)",
            r"_owner\s*=.*0x0+",
            r"owner.*=.*address\(0\)"
        ]
        return self._check_pattern_exists(code, renounce_patterns)

    def _extract_tax_rate(self, code):
        """Kod i√ßinden vergi oranƒ±nƒ± √ßƒ±karma"""
        tax_patterns = [
            r"buytax\s*=\s*(\d+)",
            r"selltax\s*=\s*(\d+)",
            r"tax.*=\s*(\d+)"
        ]

        max_tax = 0
        for pattern in tax_patterns:
            matches = re.findall(pattern, code, re.IGNORECASE)
            for match in matches:
                max_tax = max(max_tax, int(match))

        return max_tax

    def _check_honeypot_patterns(self, code):
        """Honeypot kalƒ±plarƒ±nƒ± kontrol et"""
        honeypot_patterns = [
            r"require.*balances\[.*\]\s*>=",  # Balance kontrol√º
            r"if.*balances\[.*\].*return false",  # Transfer engelleme
            r"mapping.*bool.*isexcluded",  # Exclusion mapping
            r"tradingopen\s*=\s*false"  # Trading kapalƒ±
        ]
        return self._check_pattern_exists(code, honeypot_patterns)

Overwriting utils/code_analysis.py


In [None]:
%%writefile utils/feature_engineering.py
import numpy as np
from datetime import datetime

class FeatureEngineer:
    def build_features(self, onchain_data, social_data=None):
        """Ham verilerden ML modeli i√ßin √∂zellikler olu≈ütur"""

        # Temel √∂zellikler
        features = [
            onchain_data.get("holder_concentration", 75),
            int(onchain_data.get("mint_function", True)),
            int(onchain_data.get("lp_locked", False)),
            onchain_data.get("max_tax", 25),
            int(onchain_data.get("owner_renounced", False)),
            int(onchain_data.get("contract_verified", False))
        ]

        # Kontrat ya≈üƒ± (g√ºn cinsinden)
        creation_date = onchain_data.get("creation_date", 0)
        if creation_date > 0:
            age_days = (datetime.now().timestamp() - creation_date) / 86400
            features.append(min(age_days, 365))  # Max 1 yƒ±l
        else:
            features.append(0)

        # Holder sayƒ±sƒ± (log √∂l√ßeƒüi)
        holder_count = onchain_data.get("holder_count", 1)
        features.append(np.log10(max(holder_count, 1)))

        # Sosyal medya √∂zellikleri
        if social_data:
            features.extend([
                social_data.get("twitter_mentions", 0),
                social_data.get("sentiment_score", 0.5) * 100,  # 0-100 √∂l√ßeƒüi
                int(social_data.get("bot_activity", False)),
                int(social_data.get("pump_dump_keywords", False))
            ])
        else:
            features.extend([0, 50, 0, 0])  # Default deƒüerler

        return np.array(features)

    def get_feature_names(self):
        """√ñzellik isimlerini d√∂nd√ºr"""
        return [
            "holder_concentration",
            "mint_function",
            "lp_locked",
            "max_tax",
            "owner_renounced",
            "contract_verified",
            "age_days",
            "log_holder_count",
            "twitter_mentions",
            "sentiment_score",
            "bot_activity",
            "pump_dump_keywords"
        ]

Writing utils/feature_engineering.py


In [None]:
%%writefile main.py

import gradio as gr
import sys
import os
import requests
from web3 import Web3
from dotenv import load_dotenv
import json
import time
import numpy as np
import pandas as pd
from datetime import datetime
import re
import tweepy # Sadece Config.TWITTER_BEARER_TOKEN varsa kullanƒ±lacak

# Load environment variables from .env
load_dotenv()

class Config:
    INFURA_PROJECT_ID = os.getenv('INFURA_PROJECT_ID')
    ETHERSCAN_API_KEY = os.getenv('ETHERSCAN_API_KEY')
    TWITTER_BEARER_TOKEN = os.getenv('TWITTER_BEARER_TOKEN')
    INFURA_URL = f"https://mainnet.infura.io/v3/{INFURA_PROJECT_ID}"

# utils/onchain_checker.py i√ßeriƒüi
class OnchainChecker:
    def __init__(self):
        # API anahtarlarƒ±nƒ±n ayarlandƒ±ƒüƒ±ndan emin olun
        if not Config.INFURA_PROJECT_ID or not Config.ETHERSCAN_API_KEY:
             print("‚ùå API anahtarlarƒ± (Infura, Etherscan) ayarlanmamƒ±≈ü! On-chain analiz yapƒ±lamayacak.")
             self.w3 = None # Dummy Web3 objesi
             self.etherscan_api = None
        else:
            self.w3 = Web3(Web3.HTTPProvider(Config.INFURA_URL))
            self.etherscan_api = Config.ETHERSCAN_API_KEY
            # Check Infura connection
            if not self.w3.is_connected():
                print("‚ùå Infura baƒülantƒ±sƒ± kurulamadƒ±! INFURA_PROJECT_ID'nizi kontrol edin.")
            else:
                print("‚úÖ Infura baƒülantƒ±sƒ± ba≈üarƒ±lƒ±.")


    def get_contract_features(self, address):
        if self.w3 is None or self.etherscan_api is None:
            print("‚ùå API anahtarlarƒ± eksik, on-chain analiz atlandƒ±.")
            return self._get_default_features()


        try:
            # Kontrat adresini doƒürula
            if not self.w3.is_address(address):
                raise ValueError("Ge√ßersiz kontrat adresi")

            address = self.w3.to_checksum_address(address)
            print(f"üîç Analiz edilen adres (checksum): {address}")


            # Token bilgilerini al
            print("-> Token bilgileri alƒ±nƒ±yor...")
            token_info = self._get_token_info(address)
            print(f"<- Token bilgileri: {token_info}")

            print("-> Holder daƒüƒ±lƒ±mƒ± alƒ±nƒ±yor...")
            holder_data = self._get_holder_distribution(address)
            print(f"<- Holder sayƒ±sƒ±: {len(holder_data)}")


            print("-> Kontrat kaynak kodu alƒ±nƒ±yor...")
            contract_code = self._get_contract_source(address)
            print(f"<- Kontrat doƒürulanmƒ±≈ü: {contract_code.get('verified', False)}")


            print("-> Likidite kilidi kontrol ediliyor...")
            liquidity_info = self._check_liquidity_lock(address)
            print(f"<- LP kilitli: {liquidity_info.get('locked', False)}")


            print("-> Owner renounce kontrol ediliyor...")
            owner_renounced = self._check_ownership_renounced(address)
            print(f"<- Owner renounce edilmi≈ü: {owner_renounced}")


            print("-> Kontrat olu≈üturulma tarihi alƒ±nƒ±yor...")
            creation_date = self._get_creation_date(address)
            print(f"<- Olu≈üturulma tarihi (timestamp): {creation_date}")


            return {
                "holder_concentration": self._calculate_concentration(holder_data),
                "mint_function": self._check_mint_function(contract_code),
                "lp_locked": liquidity_info.get("locked", False),
                "max_tax": self._extract_max_tax(contract_code),
                "owner_renounced": owner_renounced,
                "contract_verified": contract_code.get("verified", False),
                "creation_date": creation_date,
                "total_supply": token_info.get("total_supply", 0),
                "holder_count": len(holder_data)
            }
        except Exception as e:
            print(f"‚ùå OnchainChecker hatasƒ±: {e}")
            return self._get_default_features()

    def _get_token_info(self, address):
        """Token temel bilgilerini al"""
        if self.w3 is None: return {"total_supply": 0}
        try:
            # ERC-20 ABI (sadece gerekli fonksiyonlar)
            erc20_abi = [
                {
                    "constant": True,
                    "inputs": [],
                    "name": "totalSupply",
                    "outputs": [{"name": "", "type": "uint256"}],
                    "type": "function"
                }
            ]

            contract = self.w3.eth.contract(address=address, abi=erc20_abi)
            total_supply = contract.functions.totalSupply().call()

            return {"total_supply": total_supply}
        except Exception as e:
            print(f"  _get_token_info hatasƒ±: {e}")
            return {"total_supply": 0}

    def _get_holder_distribution(self, address):
        """Holder daƒüƒ±lƒ±mƒ±nƒ± Etherscan'den al"""
        if self.etherscan_api is None: return []
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "token",
                "action": "tokenholderlist",
                "contractaddress": address,
                "page": 1,
                "offset": 100,
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            # print(f"  Etherscan API √ßaƒürƒ±sƒ± (tokenholderlist): {url}?module=token&action=tokenholderlist&contractaddress={address}&page=1&offset=100&apikey=...")

            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            # print(f"  Etherscan API yanƒ±tƒ± (tokenholderlist status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1":
                return data.get("result", [])
            else:
                print(f"  Etherscan API'den holder bilgisi alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                return []
        except requests.exceptions.RequestException as e:
            print(f"  _get_holder_distribution istek hatasƒ±: {e}")
            return []
        except Exception as e:
            print(f"  _get_holder_distribution genel hata: {e}")
            return []


    def _calculate_concentration(self, holders):
        """Top 10 holder konsantrasyonunu hesapla"""
        if not holders:
            return 100  # Veri yoksa risk y√ºksek

        try:
            # ƒ∞lk 10 holder'ƒ±n toplam token y√ºzdesi
            # TokenHolderQuantity string gelebilir, float'a √ßevirirken hata olabilir
            top_10_balance = 0
            for h in holders[:10]:
                try:
                    top_10_balance += float(h.get("TokenHolderQuantity", 0))
                except ValueError:
                    print(f"  _calculate_concentration ValueError: {h.get('TokenHolderQuantity')} float'a √ßevrilemedi.")
                    continue # Hatalƒ± veriyi atla

            total_supply = 0
            for h in holders:
                 try:
                    total_supply += float(h.get("TokenHolderQuantity", 0))
                 except ValueError:
                     print(f"  _calculate_concentration ValueError: {h.get('TokenHolderQuantity')} float'a √ßevrilemedi.")
                     continue # Hatalƒ± veriyi atla


            if total_supply == 0:
                return 100

            concentration = (top_10_balance / total_supply) * 100
            return min(concentration, 100)  # Max %100
        except Exception as e:
            print(f"  _calculate_concentration genel hata: {e}")
            return 75  # Default deƒüer

    def _get_contract_source(self, address):
        """Kontrat kaynak kodunu al"""
        if self.etherscan_api is None: return {"source_code": "", "verified": False}
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "contract",
                "action": "getsourcecode",
                "address": address,
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            # print(f"  Etherscan API √ßaƒürƒ±sƒ± (getsourcecode): {url}?module=contract&action=getsourcecode&address={address}&apikey=...")


            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            # print(f"  Etherscan API yanƒ±tƒ± (getsourcecode status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1" and data.get("result"):
                source_code = data["result"][0].get("SourceCode", "")
                return {
                    "source_code": source_code,
                    "verified": len(source_code) > 0 and source_code != "Contract source code not verified" # Doƒürulama mesajƒ±nƒ± da kontrol et
                }
            else:
                 print(f"  Etherscan API'den kaynak kodu alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                 return {"source_code": "", "verified": False}

        except requests.exceptions.RequestException as e:
            print(f"  _get_contract_source istek hatasƒ±: {e}")
            return {"source_code": "", "verified": False}
        except Exception as e:
            print(f"  _get_contract_source genel hata: {e}")
            return {"source_code": "", "verified": False}


    def _check_mint_function(self, contract_data):
        """Mint fonksiyonu var mƒ± kontrol et"""
        source_code = contract_data.get("source_code", "").lower()
        mint_keywords = ["function mint", "mint(", "_mint(", "mintto", "mint_"]
        return any(keyword in source_code for keyword in mint_keywords)

    def _extract_max_tax(self, contract_data):
        """Maksimum tax oranƒ±nƒ± bulmaya √ßalƒ±≈ü"""
        source_code = contract_data.get("source_code", "").lower()

        # Yaygƒ±n tax variable isimleri
        tax_patterns = ["buytax", "selltax", "tax", "fee"]

        import re
        max_tax = 0
        for pattern in tax_patterns:
            # uint256 public buyTax = 5; gibi patternleri ara
            regex = rf"{pattern}\s*=\s*(\d+)"
            matches = re.findall(regex, source_code)
            for match in matches:
                 try:
                     max_tax = max(max_tax, int(match))
                 except ValueError:
                     print(f"  _extract_max_tax ValueError: {match} int'e √ßevrilemedi.")
                     continue # Hatalƒ± veriyi atla

        return max_tax # Default deƒüer


    def _check_ownership_renounced(self, address):
        """Owner renounce edilmi≈ü mi kontrol et"""
        if self.w3 is None: return False
        try:
            # Owner fonksiyonu √ßaƒüƒ±r
            owner_abi = [{
                "constant": True,
                "inputs": [],
                "name": "owner",
                "outputs": [{"name": "", "type": "address"}],
                "type": "function"
            }]

            contract = self.w3.eth.contract(address=address, abi=owner_abi)
            owner = contract.functions.owner().call()

            # 0x000...000 adresine sahipse renounce edilmi≈ü
            return owner == "0x0000000000000000000000000000000000000000"
        except Exception as e:
            print(f"  _check_ownership_renounced hatasƒ±: {e}")
            return False

    def _check_liquidity_lock(self, address):
        """Likidite kilidi kontrol et (basit versiyon)"""
        # Bu ger√ßek implementasyonda Uniswap/PancakeSwap pool kontratlarƒ±nƒ± kontrol etmeli
        # Etherscan API'sinde doƒürudan likidite kilidi bilgisi yok, bu daha karma≈üƒ±k bir analiz gerektirir.
        # ≈ûimdilik sabit deƒüer d√∂nd√ºr√ºyoruz.
        # print("  _check_liquidity_lock: Basit kontrol, her zaman False d√∂nd√ºr√ºyor.")
        return {"locked": False, "lock_time": 0}

    def _get_creation_date(self, address):
        """Kontrat olu≈üturulma tarihini al"""
        if self.etherscan_api is None: return 0
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "account",
                "action": "txlist",
                "address": address,
                "startblock": 0,
                "endblock": 99999999,
                "page": 1,
                "offset": 1,
                "sort": "asc",
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            # print(f"  Etherscan API √ßaƒürƒ±sƒ± (txlist): {url}?module=account&action=txlist&address={address}&startblock=0&endblock=99999999&page=1&offset=1&sort=asc&apikey=...")


            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            # print(f"  Etherscan API yanƒ±tƒ± (txlist status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1" and data.get("result"):
                timestamp = int(data["result"][0].get("timeStamp", 0))
                return timestamp
            else:
                print(f"  Etherscan API'den olu≈üturulma tarihi alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                return 0
        except requests.exceptions.RequestException as e:
            print(f"  _get_creation_date istek hatasƒ±: {e}")
            return 0
        except Exception as e:
            print(f"  _get_creation_date genel hata: {e}")
            return 0


    def _get_default_features(self):
        """Hata durumunda default deƒüerler"""
        return {
            "holder_concentration": 75,
            "mint_function": True,
            "lp_locked": False,
            "max_tax": 25,
            "owner_renounced": False,
            "contract_verified": False,
            "creation_date": 0,
            "total_supply": 0,
            "holder_count": 0
        }

# utils/social_analysis.py i√ßeriƒüi
class SocialAnalyzer:
    def __init__(self):
        if Config.TWITTER_BEARER_TOKEN:
            try:
                self.twitter_client = tweepy.Client(bearer_token=Config.TWITTER_BEARER_TOKEN)
                # Verify credentials - basic check
                # self.twitter_client.get_me() # Bu kotaya dahil olabilir, dikkatli kullanƒ±n
                print("‚úÖ Twitter istemcisi ba≈ülatƒ±ldƒ±.")
            except Exception as e:
                 print(f"‚ùå Twitter istemcisi ba≈ülatƒ±lƒ±rken hata olu≈ütu: {e}")
                 self.twitter_client = None
        else:
            print("‚ö†Ô∏è TWITTER_BEARER_TOKEN ayarlanmamƒ±≈ü. Sosyal medya analizi yapƒ±lamayacak.")
            self.twitter_client = None


    def analyze_social_signals(self, token_name, contract_address):
        """Token i√ßin sosyal medya sinyallerini analiz et"""
        signals = {
            "twitter_mentions": 0,
            "sentiment_score": 0.5,  # 0-1 arasƒ±
            "bot_activity": False,
            "pump_dump_keywords": False
        }

        if self.twitter_client:
            try:
                # Sorgu olu≈üturma - Hata ayƒ±klama i√ßin basƒ±ldƒ±
                # Tweet sorgusu i√ßin kurallar: https://developer.twitter.com/en/docs/twitter-api/tweets/search/integrate/build-a-query
                # Contract adresi yerine sadece token adƒ± veya kƒ±saltmasƒ± daha iyi olabilir
                # √áok kƒ±sa contract adresi par√ßasƒ± ([:10]) ge√ßersiz sorguya neden olabilir
                # Token adƒ± bo≈üsa veya √ßok kƒ±saysa da sorun olabilir.
                query_parts = []
                # Check if token_name is provided and is reasonably long
                if token_name and len(token_name.strip()) > 2: # Use token name if provided and > 2 chars
                    query_parts.append(f'"{token_name.strip()}"')
                # Add contract address only if token_name is not used or is short, and address is valid length
                # Using the full address in query might be too specific, maybe just the beginning?
                # Or perhaps only use contract address if no token name is given?
                # Let's prioritize token name, then fall back to part of address if name is not useful.
                elif contract_address and len(contract_address) > 10: # Check contract address length
                     query_parts.append(f'{contract_address[:10]}') # Use first 10 chars


                if not query_parts:
                     print("‚ö†Ô∏è Twitter sorgusu i√ßin yeterli bilgi (token adƒ± veya kontrat adresi) yok.")
                     return signals # Bo≈ü sinyaller d√∂nd√ºr

                # Combine query parts with OR, ensure no leading/trailing OR if only one part
                query = " OR ".join(query_parts)
                final_query = f"{query} -is:retweet lang:en" # Retweetleri hari√ß tut

                print(f"  Twitter API sorgusu: {final_query}")

                # Son 24 saat i√ßinde token hakkƒ±nda tweetleri ara
                # start_time = datetime.utcnow() - timedelta(hours=24)
                # start_time_str = start_time.isoformat("T") + "Z" # RFC3339 formatƒ±


                tweets = self.twitter_client.search_recent_tweets(
                    query=final_query,
                    max_results=10, # Test i√ßin daha az sonu√ß √ßek
                    # start_time=start_time_str, # Son 24 saat
                    tweet_fields=["created_at", "author_id", "public_metrics", "text"] # text'i de al
                )

                if tweets and tweets.data:
                    print(f"  Twitter API {len(tweets.data)} tweet buldu.")
                    signals["twitter_mentions"] = len(tweets.data)
                    signals["pump_dump_keywords"] = self._check_pump_dump_keywords(tweets.data)
                    signals["bot_activity"] = self._detect_bot_activity(tweets.data)
                    signals["sentiment_score"] = self._calculate_sentiment(tweets.data)
                else:
                    print("  Twitter API tweet bulamadƒ±.")
                    signals["twitter_mentions"] = 0


            except tweepy.errors.TweepyException as e:
                 print(f"‚ùå Twitter API hatasƒ± (TweepyException): {e}")
                 # Hata detaylarƒ±nƒ± yazdƒ±rma
                 if hasattr(e, 'response') and e.response is not None:
                     print(f"  HTTP Status Code: {e.response.status_code}")
                     try:
                         print(f"  Response Body: {e.response.json()}")
                     except:
                         print(f"  Response Body: {e.response.text}")

            except Exception as e:
                print(f"‚ùå Sosyal medya analizi genel hata: {e}")

        else:
            print("‚ö†Ô∏è Twitter istemcisi kullanƒ±lamiyor. Sosyal medya analizi atlandƒ±.")


        return signals

    def _check_pump_dump_keywords(self, tweets):
        """Pump&dump anahtar kelimelerini kontrol et"""
        pump_keywords = [
            "moon", "üöÄ", "pump", "easy money", "quick profit",
            "100x", "1000x", "get rich", "diamond hands", "ape in"
        ]

        # tweets bir liste, her √∂ƒüe bir Tweet nesnesi
        tweet_texts = " ".join([tweet.text.lower() for tweet in tweets])
        return any(keyword in tweet_texts for keyword in pump_keywords)

    def _detect_bot_activity(self, tweets):
        """Bot aktivitesi tespit et"""
        if len(tweets) < 5: # Bot aktivitesi i√ßin minimum tweet sayƒ±sƒ±
            return False

        # Aynƒ± metinlerin tekrarƒ±
        tweet_texts = [tweet.text.strip() for tweet in tweets] # Ba≈üƒ±ndaki ve sonundaki bo≈üluklarƒ± kaldƒ±r
        unique_tweets = set(tweet_texts)

        # %50'den fazlasƒ± aynƒ±ysa bot aktivitesi olabilir (daha d√º≈ü√ºk e≈üik)
        is_bot = len(unique_tweets) / len(tweet_texts) < 0.5
        if is_bot:
            print("  Bot aktivitesi ≈ü√ºphesi: Tekrarlayan tweetler tespit edildi.")
        return is_bot


    def _calculate_sentiment(self, tweets):
        """Basit sentiment analizi"""
        positive_words = ["good", "great", "amazing", "bullish", "buy", "long", "holding", "hodl", "strong"]
        negative_words = ["scam", "rug", "fake", "avoid", "dump", "short", "sell", "warning", "risk"]

        total_score = 0
        if not tweets:
            return 0.5 # Tweet yoksa n√∂tr

        for tweet in tweets:
            text = tweet.text.lower()
            pos_count = sum(word in text for word in positive_words)
            neg_count = sum(word in text for word in negative_words)

            # Sentiment skorunu hesapla (basit oran)
            score = 0.5 # Default n√∂tr
            if pos_count > neg_count:
                score = 1.0 # Pozitif
            elif neg_count > pos_count:
                score = 0.0 # Negatif
            # Eƒüer pos_count == neg_count ise skor 0.5 kalƒ±r (n√∂tr)

            total_score += score

        # Ortalama sentiment
        average_sentiment = total_score / len(tweets)
        return average_sentiment

# utils/code_analysis.py i√ßeriƒüi
class CodeAnalyzer:
    def __init__(self):
        self.risk_patterns = {
            "mint_function": [
                r"function\s+mint\s*\(",
                r"function\s+_mint\s*\(",
                r"\.mint\s*\(",
                r"mintTo\s*\("
            ],
            "ownership_issues": [
                r"onlyOwner",
                r"_owner\s*=",
                r"transferOwnership",
                r"renounceOwnership"
            ],
            "liquidity_removal": [
                r"removeLiquidity",
                r"withdraw.*Liquidity",
                r"emergencyWithdraw"
            ],
            "tax_functions": [
                r"buyTax\s*=",
                r"sellTax\s*=",
                r"setTax",
                r"updateTax"
            ],
            "blacklist_functions": [
                r"blacklist",
                r"addBot",
                r"removeBot",
                r"setBot"
            ],
            "pause_functions": [
                r"pause\s*\(",
                r"unpause\s*\(",
                r"setPaused"
            ]
        }

    def analyze_contract_code(self, address, source_code=""):
        """Kontrat kodunu analiz ederek risk bayraklarƒ±nƒ± d√∂nd√ºr"""
        flags = []

        if not source_code:
            flags.append("‚ö†Ô∏è Kontrat kodu doƒürulanamadƒ±")
            return flags

        source_code = source_code.lower()

        # Mint fonksiyonu kontrol√º
        if self._check_pattern_exists(source_code, self.risk_patterns["mint_function"]):
            flags.append("üî¥ Mint fonksiyonu tespit edildi")

        # Ownership kontrolleri
        if self._check_pattern_exists(source_code, self.risk_patterns["ownership_issues"]):
            # Not: Ownership renounce kontrol√º i√ßin onchain veriye ihtiya√ß var, bu fonksiyon sadece kod patternlerine bakar.
            # OnchainChecker'daki renounce bilgisini burada doƒürudan kullanamayƒ±z.
            # Basitlik i√ßin sadece pattern varlƒ±ƒüƒ±nƒ± kontrol ediyoruz.
             flags.append("üî¥ Ownership patternleri tespit edildi")


        # Likidite √ßekme fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["liquidity_removal"]):
            flags.append("üî¥ Likidite √ßekme fonksiyonu var")

        # Vergi fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["tax_functions"]):
            tax_rate = self._extract_tax_rate(source_code)
            if tax_rate > 10:
                flags.append(f"üî¥ Y√ºksek vergi oranƒ±: %{tax_rate}")

        # Blacklist fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["blacklist_functions"]):
            flags.append("üî¥ Blacklist fonksiyonu tespit edildi")

        # Pause fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["pause_functions"]):
            flags.append("üî¥ Trading durdurma fonksiyonu var")

        # Honeypot kontrolleri
        if self._check_honeypot_patterns(source_code):
            flags.append("üî¥ Honeypot kalƒ±plarƒ± tespit edildi")

        if not flags:
            flags.append("‚úÖ Kod analizi temiz")

        return flags

    def _check_pattern_exists(self, code, patterns):
        """Verilen pattern'lerden herhangi biri var mƒ± kontrol et"""
        for pattern in patterns:
            if re.search(pattern, code, re.IGNORECASE):
                return True
        return False

    def _check_ownership_renounced(self, code):
        """Ownership renounce edilmi≈ü mi kontrol et"""
        # Bu fonksiyon CodeAnalyzer i√ßinde kullanƒ±lƒ±yordu ancak on-chain veriye ihtiya√ß duyar.
        # Sadece kod patternlerine bakarak renounce kontrol√º yapmak zordur.
        # Bu nedenle bu fonksiyon burada tam doƒüru √ßalƒ±≈ümayabilir.
        # OnchainChecker'daki renounced bilgisi daha g√ºvenilirdir.
        renounce_patterns = [
            r"renounceownership\s*\(\s*\)",
            r"_owner\s*=.*0x0+",
            r"owner.*=.*address\(0\)"
        ]
        return self._check_pattern_exists(code, renounce_patterns) # Sadece pattern var mƒ± diye bakar


    def _extract_tax_rate(self, code):
        """Kod i√ßinden vergi oranƒ±nƒ± √ßƒ±karma"""
        tax_patterns = [
            r"buytax\s*=\s*(\d+)",
            r"selltax\s*=\s*(\d+)",
            r"tax.*=\s*(\d+)"
        ]

        max_tax = 0
        for pattern in tax_patterns:
            matches = re.findall(pattern, code, re.IGNORECASE)
            for match in matches:
                try:
                    max_tax = max(max_tax, int(match))
                except ValueError:
                    print(f"  _extract_tax_rate ValueError: {match} int'e √ßevrilemedi.")
                    continue # Hatalƒ± veriyi atla

        return max_tax

    def _check_honeypot_patterns(self, code):
        """Honeypot kalƒ±plarƒ±nƒ± kontrol et"""
        honeypot_patterns = [
            r"require.*balances\[.*\]\s*>=",  # Balance kontrol√º
            r"if.*balances\[.*\].*return false",  # Transfer engelleme
            r"mapping.*bool.*isexcluded",  # Exclusion mapping
            r"tradingopen\s*=\s*false"  # Trading kapalƒ±
        ]
        return self._check_pattern_exists(code, honeypot_patterns)

# utils/feature_engineering.py i√ßeriƒüi
class FeatureEngineer:
    def build_features(self, onchain_data, social_data=None):
        """Ham verilerden ML modeli i√ßin √∂zellikler olu≈ütur"""

        # Temel √∂zellikler
        features = [
            onchain_data.get("holder_concentration", 75),
            int(onchain_data.get("mint_function", True)),
            int(onchain_data.get("lp_locked", False)),
            onchain_data.get("max_tax", 25),
            int(onchain_data.get("owner_renounced", False)),
            int(onchain_data.get("contract_verified", False))
        ]

        # Kontrat ya≈üƒ± (g√ºn cinsinden)
        creation_date = onchain_data.get("creation_date", 0)
        if creation_date > 0:
            age_days = (datetime.now().timestamp() - creation_date) / 86400
            features.append(min(age_days, 365))  # Max 1 yƒ±l
        else:
            features.append(0)

        # Holder sayƒ±sƒ± (log √∂l√ßeƒüi)
        holder_count = onchain_data.get("holder_count", 1)
        features.append(np.log10(max(holder_count, 1)))

        # Sosyal medya √∂zellikleri
        if social_data:
            features.extend([
                social_data.get("twitter_mentions", 0),
                social_data.get("sentiment_score", 0.5) * 100,  # 0-100 √∂l√ßeƒüi
                int(social_data.get("bot_activity", False)),
                int(social_data.get("pump_dump_keywords", False))
            ])
        else:
            features.extend([0, 50, 0, 0])  # Default deƒüerler

        return np.array(features)

    def get_feature_names(self):
        """√ñzellik isimlerini d√∂nd√ºr"""
        return [
            "holder_concentration",
            "mint_function",
            "lp_locked",
            "max_tax",
            "owner_renounced",
            "contract_verified",
            "age_days",
            "log_holder_count",
            "twitter_mentions",
            "sentiment_score",
            "bot_activity",
            "pump_dump_keywords"
        ]

# Ana ScamDetector sƒ±nƒ±fƒ± (main.py'nin √∂nceki i√ßeriƒüi)
class ScamDetector:
    def __init__(self):
        self.onchain_checker = OnchainChecker()
        self.social_analyzer = SocialAnalyzer()
        self.code_analyzer = CodeAnalyzer()
        self.feature_engineer = FeatureEngineer()

        # Modeli y√ºkle (placeholder)
        # Ger√ßek bir model eƒüitildiƒüinde bu kƒ±sƒ±m kullanƒ±lacak
        self.model = None
        # model_path = "models/scam_model.pkl"
        # if os.path.exists(model_path):
        #     try:
        #         self.model = joblib.load(model_path)
        #         print("‚úÖ Model ba≈üarƒ±yla y√ºklendi.")
        #     except Exception as e:
        #         print(f"‚ùå Model y√ºkleme hatasƒ±: {e}")
        #         self.model = None
        # else:
        #     print("‚ö†Ô∏è Model bulunamadƒ±. Risk skoru placeholder olacaktƒ±r.")


    def analyze_contract(self, contract_address, token_name=""):
        """Ana analiz fonksiyonu"""
        print(f"üîç Analiz ba≈ülƒ±yor: {contract_address}")

        try:
            # 1. Onchain verilerini al
            print("üìä Onchain veriler alƒ±nƒ±yor...")
            onchain_features = self.onchain_checker.get_contract_features(contract_address)

            # 2. Sosyal medya analizi (opsiyonel)
            social_features = None
            # Sosyal medya analizi i√ßin token_name veya contract_address gerekiyor
            if token_name or contract_address:
                 print("üê¶ Sosyal medya analizi yapƒ±lƒ±yor...")
                 # SocialAnalyzer analyze_social_signals method expects token_name and contract_address
                 social_features = self.social_analyzer.analyze_social_signals(token_name, contract_address)


            # 3. Kod analizi
            print("üíª Kontrat kodu analiz ediliyor...")
            # get_contract_source is a private method, access it carefully or make it public if needed
            # Dummy checker durumunda source_code bo≈ü olacaktƒ±r, bu CodeAnalyzer tarafƒ±ndan i≈ülenecektir.
            source_code = self.onchain_checker._get_contract_source(contract_address).get("source_code", "")
            code_flags = self.code_analyzer.analyze_contract_code(contract_address, source_code)

            # 4. ML √∂zelliklerini olu≈ütur
            print("‚öôÔ∏è √ñzellik m√ºhendisliƒüi yapƒ±lƒ±yor...")
            features = self.feature_engineer.build_features(onchain_features, social_features)

            # 5. Risk skorunu hesapla (placeholder veya model ile)
            risk_score = self._calculate_placeholder_risk(onchain_features, code_flags) # Default placeholder
            ml_prediction = "Tahmin Yok (Model Yok)"


            if self.model is not None:
                try:
                    # Model tahminini yap
                    # Modelin beklediƒüi input formatƒ±na dikkat edin (genellikle 2D array)
                    features_2d = np.array([features]) # Model predict tek √∂rnek i√ßin 2D array bekler
                    risk_probability = self.model.predict_proba(features_2d)[0][1]
                    risk_score = round(risk_probability * 100, 2)
                    ml_prediction = "Scam" if risk_probability > 0.7 else "G√ºvenli" if risk_probability < 0.3 else "Dikkatli ol"
                    print(f"‚úÖ Model tahmini yapƒ±ldƒ±: Risk Olasƒ±lƒ±ƒüƒ± = {risk_probability:.2f}, Tahmin: {ml_prediction}")
                except Exception as e:
                    print(f"‚ùå Model tahmin hatasƒ±: {e}")
                    ml_prediction = f"Model Hatasƒ±: {e}"


            # 6. Sonu√ßlarƒ± birle≈ütir
            result = {
                "contract": contract_address,
                "token_name": token_name,
                "risk_score": risk_score,
                "ml_prediction": ml_prediction,
                "red_flags": code_flags,
                "onchain_data": onchain_features,
                "social_data": social_features,
                "analysis_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "recommendations": self._generate_recommendations(risk_score, onchain_features, code_flags)
            }

            return result

        except Exception as e:
            print(f"‚ùå Analiz hatasƒ±: {e}")
            import traceback
            traceback.print_exc() # Hata detaylarƒ±nƒ± yazdƒ±r

            return {
                "contract": contract_address,
                "risk_score": 100,  # Hata durumunda y√ºksek risk
                "ml_prediction": "Analiz edilemedi",
                "red_flags": [f"‚ùå Analiz hatasƒ±: {str(e)}"],
                "error": str(e)
            }

    def _calculate_placeholder_risk(self, onchain_data, code_flags):
        """Basit placeholder risk skoru hesaplama"""
        score = 0

        # On-chain verilere g√∂re puanlama
        # onchain_data'nƒ±n bo≈ü olup olmadƒ±ƒüƒ±nƒ± kontrol edin
        if onchain_data:
            if onchain_data.get("holder_concentration", 75) > 60: score += 20
            if onchain_data.get("mint_function", True): score += 20
            if not onchain_data.get("lp_locked", False): score += 20
            if onchain_data.get("max_tax", 25) > 15: score += 15
            if not onchain_data.get("owner_renounced", False): score += 15
            if not onchain_data.get("contract_verified", False): score += 10
        else:
             score += 50 # On-chain veri alƒ±namƒ±yorsa riskli say


        # Kod analizindeki bayraklara g√∂re puanlama
        if "üî¥ Mint fonksiyonu tespit edildi" in code_flags: score += 10
        if "üî¥ Owner haklarƒ± korunuyor" in code_flags: score += 10
        if "üî¥ Likidite √ßekme fonksiyonu var" in code_flags: score += 10
        # Y√ºksek vergi oranƒ± bayraƒüƒ± zaten oranƒ± i√ßeriyor, sadece bayraƒüƒ±n varlƒ±ƒüƒ±nƒ± kontrol edelim.
        if any("üî¥ Y√ºksek vergi oranƒ±" in flag for flag in code_flags): score += 10
        if "üî¥ Blacklist fonksiyonu tespit edildi" in code_flags: score += 15
        if "üî¥ Trading durdurma fonksiyonu var" in code_flags: score += 15
        if "üî¥ Honeypot kalƒ±plarƒ± tespit edildi" in code_flags: score += 25
        if "‚ö†Ô∏è Kontrat kodu doƒürulanamadƒ±" in code_flags: score += 20 # Doƒürulanamayan kod risklidir.


        return min(score, 100) # Max 100


    def _generate_recommendations(self, risk_score, onchain_data, code_flags):
        """Risk skoruna g√∂re √∂neriler olu≈ütur"""
        recommendations = []

        if risk_score >= 80:
            recommendations.append("üö® Y√úK¬≠SEK Rƒ∞SK: Bu token'a yatƒ±rƒ±m yapmayƒ±n!")
            recommendations.append("üí° Ba≈üka projeleri ara≈ütƒ±rƒ±n")
        elif risk_score >= 60:
            recommendations.append("‚ö†Ô∏è ORTA Rƒ∞SK: √áok dikkatli olun")
            recommendations.append("üí° K√º√ß√ºk miktarla test edin")
            recommendations.append("üí° Exit stratejinizi belirleyin")
        elif risk_score >= 40:
            recommendations.append("üü° D√ú¬≠≈û√úK-ORTA Rƒ∞SK: Ara≈ütƒ±rma yapƒ±n")
            recommendations.append("üí° Topluluk ve geli≈ütirici aktivitesini kontrol edin")
        else:
            recommendations.append("‚úÖ D√ú¬≠≈û√úK Rƒ∞SK: G√∂rece g√ºvenli g√∂r√ºn√ºyor")
            recommendations.append("üí° Yine de kendi ara≈ütƒ±rmanƒ±zƒ± yapƒ±n")

        # √ñzel durumlar i√ßin √∂neriler (onchain_data'nƒ±n varlƒ±ƒüƒ±nƒ± kontrol edin)
        if onchain_data:
            if not onchain_data.get("contract_verified", False):
                recommendations.append("üí° Kontrat kodu doƒürulanmamƒ±≈ü - ekstra dikkat")

            if onchain_data.get("holder_concentration", 0) > 50:
                recommendations.append("üí° Token daƒüƒ±lƒ±mƒ± centralized - whale riski")

            if not onchain_data.get("owner_renounced", False):
                recommendations.append("üí° Owner haklarƒ± hala aktif - kontrol riski")

        # Code flags'e √∂zel √∂neriler
        if "üî¥ Mint fonksiyonu tespit edildi" in code_flags:
             recommendations.append("üí° Mint fonksiyonu, token arzƒ±nƒ±n artƒ±rƒ±labileceƒüi anlamƒ±na gelir - dikkatli olun.")
        if "üî¥ Likidite √ßekme fonksiyonu var" in code_flags:
             recommendations.append("üí° Likidite √ßekme fonksiyonu rug pull riskini artƒ±rƒ±r.")
        if any("üî¥ Y√ºksek vergi oranƒ±" in flag for flag in code_flags):
             recommendations.append("üí° Y√ºksek vergi oranlarƒ± alƒ±m satƒ±m maliyetini artƒ±rƒ±r ve scam g√∂stergesi olabilir.")
        if "üî¥ Blacklist fonksiyonu tespit edildi" in code_flags:
             recommendations.append("üí° Blacklist fonksiyonu, c√ºzdanlarƒ±n alƒ±m satƒ±m yapmasƒ±nƒ± engelleyebilir.")
        if "üî¥ Trading durdurma fonksiyonu var" in code_flags:
             recommendations.append("üí° Trading durdurma fonksiyonu, token'ƒ±n satƒ±lamaz hale gelmesine neden olabilir.")
        if "üî¥ Honeypot kalƒ±plarƒ± tespit edildi" in code_flags:
             recommendations.append("üí° Honeypot kalƒ±plarƒ±, token'ƒ± alƒ±p satmanƒ±zƒ± engelleyebilir (sadece alƒ±m yapƒ±labilir).")


        return recommendations


# Gradio Aray√ºz√º
def analyze_token_interface(contract_address, token_name):
    """Gradio aray√ºz√º i√ßin √ßaƒürƒ±lacak fonksiyon"""
    detector = ScamDetector()
    result = detector.analyze_contract(contract_address, token_name)

    # Gradio √ßƒ±ktƒ±sƒ± i√ßin formatlama
    output_text = f"## ü§ñ Kripto Token Risk Analiz Botu Sonu√ßlarƒ±\n\n"
    output_text += f"**üìß Kontrat:** {result.get('contract', 'N/A')}\n"
    output_text += f"**üìõ Token Adƒ±:** {result.get('token_name', 'N/A')}\n"

    # Risk skoruna g√∂re renkli √ßƒ±ktƒ±
    risk_score = result.get('risk_score', 'N/A')
    if isinstance(risk_score, (int, float)):
        if risk_score >= 80:
            output_text += f"**üéØ Risk Skoru:** <span style='color: red; font-weight: bold;'>{risk_score}% (Y√ºksek Risk)</span>\n"
        elif risk_score >= 60:
            output_text += f"**üéØ Risk Skoru:** <span style='color: orange; font-weight: bold;'>{risk_score}% (Orta Risk)</span>\n"
        elif risk_score >= 40:
             output_text += f"**üéØ Risk Skoru:** <span style='color: yellow; font-weight: bold;'>{risk_score}% (D√º≈ü√ºk-Orta Risk)</span>\n"
        else:
            output_text += f"**üéØ Risk Skoru:** <span style='color: green; font-weight: bold;'>{risk_score}% (D√º≈ü√ºk Risk)</span>\n"
    else:
         output_text += f"**üéØ Risk Skoru:** {risk_score}\n"


    output_text += f"**ü§ñ ML Tahmini:** {result.get('ml_prediction', 'N/A')}\n"
    output_text += f"**üìÖ Analiz Zamanƒ±:** {result.get('analysis_time', 'Bilinmiyor')}\n\n"

    output_text += f"### üö© Risk Bayraklarƒ±:\n"
    if result.get('red_flags'):
        # Bayraklara g√∂re renkli √ßƒ±ktƒ±
        for flag in result['red_flags']:
            if "üî¥" in flag:
                output_text += f"- <span style='color: red;'>{flag}</span>\n"
            elif "‚ö†Ô∏è" in flag:
                output_text += f"- <span style='color: orange;'>{flag}</span>\n"
            elif "‚úÖ" in flag:
                 output_text += f"- <span style='color: green;'>{flag}</span>\n"
            else:
                output_text += f"- {flag}\n"

    else:
        output_text += "- Risk bayraƒüƒ± bulunamadƒ±.\n"
    output_text += "\n"

    output_text += f"### üí° √ñneriler:\n"
    if result.get('recommendations'):
        for rec in result['recommendations']:
            # √ñnerilere g√∂re renkli √ßƒ±ktƒ±
            if "üö® Y√úK¬≠SEK Rƒ∞SK" in rec:
                output_text += f"- <span style='color: red; font-weight: bold;'>{rec}</span>\n"
            elif "‚ö†Ô∏è ORTA Rƒ∞SK" in rec:
                output_text += f"- <span style='color: orange; font-weight: bold;'>{rec}</span>\n"
            elif "üü° D√ú¬≠≈û√úK-ORTA Rƒ∞SK" in rec:
                 output_text += f"- <span style='color: yellow; font-weight: bold;'>{rec}</span>\n"
            elif "‚úÖ D√ú¬≠≈û√úK Rƒ∞SK" in rec:
                 output_text += f"- <span style='color: green; font-weight: bold;'>{rec}</span>\n"
            else:
                output_text += f"- {rec}\n"
    else:
        output_text += "- √ñneri bulunamadƒ±.\n"
    output_text += "\n"

    if result.get('onchain_data'):
        output_text += f"### üìä Onchain Veriler:\n"
        data = result['onchain_data']
        output_text += f"- Holder Konsantrasyonu: %{data.get('holder_concentration', 0):.1f}\n"
        output_text += f"- Mint Fonksiyonu: {'Var' if data.get('mint_function') else 'Yok'}\n"
        output_text += f"- LP Kilidi: {'Var' if data.get('lp_locked') else 'Yok'}\n"
        output_text += f"- Maksimum Tax: %{data.get('max_tax', 0)}\n"
        output_text += f"- Owner Renounced: {'Evet' if data.get('owner_renounced') else 'Hayƒ±r'}\n"
        output_text += f"- Kontrat Doƒürulanmƒ±≈ü: {'Evet' if data.get('contract_verified') else 'Hayƒ±r'}\n"
        # Timestamp'i okunabilir tarihe √ßevirelim
        creation_date_ts = data.get('creation_date', 0)
        if creation_date_ts > 0:
             creation_date_str = datetime.fromtimestamp(creation_date_ts).strftime("%Y-%m-%d %H:%M:%S")
             output_text += f"- Kontrat Olu≈üturulma Tarihi: {creation_date_str}\n"
        else:
             output_text += f"- Kontrat Olu≈üturulma Tarihi: Bilinmiyor\n"

        output_text += f"- Toplam Arz: {data.get('total_supply', 0)}\n"
        output_text += f"- Holder Sayƒ±sƒ±: {data.get('holder_count', 0)}\n"
        output_text += "\n"


    if result.get('social_data'):
        output_text += f"### üê¶ Sosyal Medya Verileri:\n"
        social_data = result['social_data']
        output_text += f"- Twitter Mention Sayƒ±sƒ± (Son 24s): {social_data.get('twitter_mentions', 0)}\n"
        output_text += f"- Sentiment Skoru (0-100): {social_data.get('sentiment_score', 0):.1f}\n"
        output_text += f"- Bot Aktivitesi Tespit Edildi: {'Evet' if social_data.get('bot_activity') else 'Hayƒ±r'}\n"
        output_text += f"- Pump/Dump Anahtar Kelimeleri: {'Var' if social_data.get('pump_dump_keywords') else 'Yok'}\n"
        output_text += "\n"

    if result.get('error'):
         output_text += f"### ‚ùå Hata:\n"
         output_text += f"<span style='color: red;'>{result['error']}</span>\n"


    return output_text

# Gradio Aray√ºz√ºn√º Olu≈ütur
if __name__ == "__main__":
    interface = gr.Interface(
        fn=analyze_token_interface,
        inputs=[
            gr.Textbox(label="Kontrat Adresi (0x...)"),
            gr.Textbox(label="Token Adƒ± (Opsiyonel, Sosyal Medya Analizi ƒ∞√ßin)")
        ],
        outputs=gr.Markdown(label="Analiz Sonu√ßlarƒ±"),
        title="ü§ñ Kripto Token Risk Analiz Botu",
        description="Bir kontrat adresini girerek token'ƒ±n on-chain ve sosyal medya risklerini analiz edin."
    )

    # Aray√ºz√º ba≈ülat
    # Colab'da √ßalƒ±≈ütƒ±rmak i√ßin share=True kullanƒ±n
    interface.launch(share=True)

Writing main.py


In [None]:
# Modeli eƒüitmek ve kaydetmek i√ßin placeholder kod (main.py'ye dahil edilebilir veya ayrƒ± bir script olabilir)
# from sklearn.model_selection import train_test_split
# from sklearn.ensemble import RandomForestClassifier
# import joblib
# import numpy as np

# # √ñrnek veri (Ger√ßek veri ile deƒüi≈ütirilmelidir)
# # features: [holder_concentration, mint_function, lp_locked, max_tax, owner_renounced, contract_verified, age_days, log_holder_count, twitter_mentions, sentiment_score, bot_activity, pump_dump_keywords]
# # labels: 0 (g√ºvenli), 1 (scam)
# X = np.array([
#     [50, 0, 1, 5, 1, 1, 100, np.log10(1000), 500, 80, 0, 0], # G√ºvenli √∂rnek
#     [80, 1, 0, 20, 0, 0, 10, np.log10(50), 1000, 30, 1, 1],  # Scam √∂rnek
#     [30, 0, 1, 2, 1, 1, 500, np.log10(5000), 200, 90, 0, 0], # G√ºvenli √∂rnek
#     [95, 1, 0, 25, 0, 0, 5, np.log10(10), 2000, 10, 1, 1],   # Scam √∂rnek
# ])
# y = np.array([0, 1, 0, 1])

# # Modeli eƒüit
# # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# # model = RandomForestClassifier(n_estimators=100, random_state=42)
# # model.fit(X_train, y_test) # Hata: y_test yerine y_train kullanƒ±lmalƒ±

# # D√ºzeltilmi≈ü eƒüitim
# # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# # model = RandomForestClassifier(n_estimators=100, random_state=42)
# # model.fit(X_train, y_train)


# # Modeli kaydet
# # if not os.path.exists('models'):
# #     os.makedirs('models')
# # joblib.dump(model, 'models/scam_model.pkl')
# # print("Model eƒüitildi ve kaydedildi: models/scam_model.pkl")

In [None]:
!python main.py

Traceback (most recent call last):
  File "/content/main.py", line 2, in <module>
    from utils.onchain_checker import OnchainChecker
  File "/content/utils/onchain_checker.py", line 3, in <module>
    from config import Config
ModuleNotFoundError: No module named 'config'


# Task
Explain the error in the selected code, fix it if possible, and explain the corrected code. If fixing is not possible, diagnose the error. Then, explain the selected code. Finally, explain how to train a continuously learning scam detection model using the provided `main.py` and Gradio interface, and what steps to take if the current queries are not yielding results, noting that the Gradio interface is working.

## Check api keys and quotas

### Subtask:
Verify that the Infura, Etherscan, and Twitter API keys are correctly set up in Colab secrets and that the usage hasn't exceeded any free tier limits.


## Gather and prepare training data

### Subtask:
To train a scam detection model, you need a dataset of smart contracts labeled as either "scam" or "legitimate". This is the most crucial and often the most challenging step. Discuss strategies for obtaining or creating this data.


**Reasoning**:
Explain the need for a labeled dataset and outline strategies for obtaining or creating it, referencing the provided code structure.



In [None]:
# 1. Need for a Labeled Dataset
print("## 1. Need for a Labeled Dataset")
print("To train a supervised machine learning model, like the one intended for scam detection in main.py, a labeled dataset is absolutely essential.")
print("This dataset must contain examples of both 'scam' and 'legitimate' smart contracts.")
print("The model learns to identify patterns and features that distinguish between these two classes based on the provided examples.")
print("Without a sufficiently large, diverse, and accurately labeled dataset, it is impossible to train a model that can reliably predict whether a new, unseen contract is a scam or not.")

# 2. Strategies for Acquiring or Creating the Dataset
print("\n## 2. Strategies for Acquiring or Creating the Dataset")
print("### Obtaining Scam Contract Data")
print("Acquiring a comprehensive and up-to-date list of scam contracts is challenging due to the dynamic nature of scams.")
print("Strategies include:")
print("- **Publicly Available Datasets:** Leveraging existing datasets compiled by security researchers, blockchain analytics firms, or community initiatives. Some platforms like Etherscan's 'Label Cloud' might tag known scam addresses.")
print("- **Community-Maintained Lists:** Exploring forums, social media groups, and websites where users report scam projects.")
print("- **Manual Identification:** Analyzing reported scams, rug pulls, and exploits to identify the associated contract addresses.")
print("Challenges include the potential for incomplete lists, outdated information, and the need to verify the legitimacy of reported scams.")

print("\n### Gathering Legitimate Contract Data")
print("Obtaining legitimate contract addresses is generally easier:")
print("- **Reputable Projects:** Collecting addresses of well-established tokens and protocols with a proven track record and strong community support (e.g., stablecoins, major DeFi protocols).")
print("- **Well-Established Tokens:** Including tokens listed on major exchanges and widely used in the ecosystem.")
print("- **Sampling Active Contracts:** Randomly sampling from a pool of active contracts on the blockchain, though this might require manual review to filter out potential scams that haven't been reported yet.")

print("\n### Labeling and Verification")
print("Once addresses are gathered, they need to be accurately labeled ('scam' or 'legitimate').")
print("This process requires careful verification, potentially involving:")
print("- **On-chain Analysis:** Examining transaction history, holder distribution, and code characteristics for suspicious patterns.")
print("- **Code Audits:** Reviewing available source code for malicious functions (as done by the CodeAnalyzer).")
print("- **Community Consensus:** Checking if the contract is widely considered legitimate or a scam by the community.")
print("Mislabeling is a significant risk and can negatively impact model performance.")

# 3. Feature Extraction and Training Data
print("\n## 3. Feature Extraction and Training Data")
print("For each contract address in the labeled dataset, the features defined in the `FeatureEngineer` class would be extracted.")
print("These features include on-chain data (like holder concentration, mint function presence, liquidity lock status, max tax, ownership renouncement, contract verification, age, and holder count) and potentially social media signals.")
print("The extracted features for each contract, paired with its 'scam' or 'legitimate' label, would form the training data for the supervised machine learning model.")

# 4. Significance of Data Gathering
print("\n## 4. Significance of Data Gathering")
print("It is crucial to understand that this data gathering, cleaning, and labeling process is a substantial undertaking.")
print("It is outside the scope of a single code execution and requires significant manual effort, domain expertise in blockchain and scams, and ongoing maintenance to keep the dataset current.")

## 1. Need for a Labeled Dataset
To train a supervised machine learning model, like the one intended for scam detection in main.py, a labeled dataset is absolutely essential.
This dataset must contain examples of both 'scam' and 'legitimate' smart contracts.
The model learns to identify patterns and features that distinguish between these two classes based on the provided examples.
Without a sufficiently large, diverse, and accurately labeled dataset, it is impossible to train a model that can reliably predict whether a new, unseen contract is a scam or not.

## 2. Strategies for Acquiring or Creating the Dataset
### Obtaining Scam Contract Data
Acquiring a comprehensive and up-to-date list of scam contracts is challenging due to the dynamic nature of scams.
Strategies include:
- **Publicly Available Datasets:** Leveraging existing datasets compiled by security researchers, blockchain analytics firms, or community initiatives. Some platforms like Etherscan's 'Label Cloud' might tag kno

In [None]:
!python main.py

Traceback (most recent call last):
  File "/content/main.py", line 2, in <module>
    from utils.onchain_checker import OnchainChecker
  File "/content/utils/onchain_checker.py", line 3, in <module>
    from config import Config
ModuleNotFoundError: No module named 'config'


In [None]:
!python main.py

Traceback (most recent call last):
  File "/content/main.py", line 2, in <module>
    from utils.onchain_checker import OnchainChecker
  File "/content/utils/onchain_checker.py", line 3, in <module>
    from config import Config
ModuleNotFoundError: No module named 'config'


In [None]:
!python main.py

Traceback (most recent call last):
  File "/content/main.py", line 2, in <module>
    from utils.onchain_checker import OnchainChecker
  File "/content/utils/onchain_checker.py", line 3, in <module>
    from config import Config
ModuleNotFoundError: No module named 'config'


In [None]:
%%writefile config.py
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    INFURA_PROJECT_ID = os.getenv('INFURA_PROJECT_ID')
    ETHERSCAN_API_KEY = os.getenv('ETHERSCAN_API_KEY')
    TWITTER_BEARER_TOKEN = os.getenv('TWITTER_BEARER_TOKEN')
    INFURA_URL = f"https://mainnet.infura.io/v3/{INFURA_PROJECT_ID}"

Writing config.py


In [None]:
%%writefile utils/onchain_checker.py
import requests
from web3 import Web3
from config import Config
import json
import time

class OnchainChecker:
    def __init__(self):
        self.w3 = Web3(Web3.HTTPProvider(Config.INFURA_URL))
        self.etherscan_api = Config.ETHERSCAN_API_KEY
        # Check Infura connection
        if not self.w3.is_connected():
            print("‚ùå Infura baƒülantƒ±sƒ± kurulamadƒ±! INFURA_PROJECT_ID'nizi kontrol edin.")
        else:
            print("‚úÖ Infura baƒülantƒ±sƒ± ba≈üarƒ±lƒ±.")


    def get_contract_features(self, address):
        try:
            # Kontrat adresini doƒürula
            if not self.w3.is_address(address):
                raise ValueError("Ge√ßersiz kontrat adresi")

            address = self.w3.to_checksum_address(address)
            print(f"üîç Analiz edilen adres (checksum): {address}")


            # Token bilgilerini al
            print("-> Token bilgileri alƒ±nƒ±yor...")
            token_info = self._get_token_info(address)
            print(f"<- Token bilgileri: {token_info}")

            print("-> Holder daƒüƒ±lƒ±mƒ± alƒ±nƒ±yor...")
            holder_data = self._get_holder_distribution(address)
            print(f"<- Holder sayƒ±sƒ±: {len(holder_data)}")


            print("-> Kontrat kaynak kodu alƒ±nƒ±yor...")
            contract_code = self._get_contract_source(address)
            print(f"<- Kontrat doƒürulanmƒ±≈ü: {contract_code.get('verified', False)}")


            print("-> Likidite kilidi kontrol ediliyor...")
            liquidity_info = self._check_liquidity_lock(address)
            print(f"<- LP kilitli: {liquidity_info.get('locked', False)}")


            print("-> Owner renounce kontrol ediliyor...")
            owner_renounced = self._check_ownership_renounced(address)
            print(f"<- Owner renounce edilmi≈ü: {owner_renounced}")


            print("-> Kontrat olu≈üturulma tarihi alƒ±nƒ±yor...")
            creation_date = self._get_creation_date(address)
            print(f"<- Olu≈üturulma tarihi (timestamp): {creation_date}")


            return {
                "holder_concentration": self._calculate_concentration(holder_data),
                "mint_function": self._check_mint_function(contract_code),
                "lp_locked": liquidity_info.get("locked", False),
                "max_tax": self._extract_max_tax(contract_code),
                "owner_renounced": owner_renounced,
                "contract_verified": contract_code.get("verified", False),
                "creation_date": creation_date,
                "total_supply": token_info.get("total_supply", 0),
                "holder_count": len(holder_data)
            }
        except Exception as e:
            print(f"‚ùå OnchainChecker hatasƒ±: {e}")
            return self._get_default_features()

    def _get_token_info(self, address):
        """Token temel bilgilerini al"""
        try:
            # ERC-20 ABI (sadece gerekli fonksiyonlar)
            erc20_abi = [
                {
                    "constant": True,
                    "inputs": [],
                    "name": "totalSupply",
                    "outputs": [{"name": "", "type": "uint256"}],
                    "type": "function"
                }
            ]

            contract = self.w3.eth.contract(address=address, abi=erc20_abi)
            total_supply = contract.functions.totalSupply().call()

            return {"total_supply": total_supply}
        except Exception as e:
            print(f"  _get_token_info hatasƒ±: {e}")
            return {"total_supply": 0}

    def _get_holder_distribution(self, address):
        """Holder daƒüƒ±lƒ±mƒ±nƒ± Etherscan'den al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "token",
                "action": "tokenholderlist",
                "contractaddress": address,
                "page": 1,
                "offset": 100,
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            print(f"  Etherscan API √ßaƒürƒ±sƒ± (tokenholderlist): {url}?module=token&action=tokenholderlist&contractaddress={address}&page=1&offset=100&apikey=...")

            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            print(f"  Etherscan API yanƒ±tƒ± (tokenholderlist status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1":
                return data.get("result", [])
            else:
                print(f"  Etherscan API'den holder bilgisi alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                return []
        except requests.exceptions.RequestException as e:
            print(f"  _get_holder_distribution istek hatasƒ±: {e}")
            return []
        except Exception as e:
            print(f"  _get_holder_distribution genel hata: {e}")
            return []


    def _calculate_concentration(self, holders):
        """Top 10 holder konsantrasyonunu hesapla"""
        if not holders:
            return 100  # Veri yoksa risk y√ºksek

        try:
            # ƒ∞lk 10 holder'ƒ±n toplam token y√ºzdesi
            # TokenHolderQuantity string gelebilir, float'a √ßevirirken hata olabilir
            top_10_balance = 0
            for h in holders[:10]:
                try:
                    top_10_balance += float(h.get("TokenHolderQuantity", 0))
                except ValueError:
                    print(f"  _calculate_concentration ValueError: {h.get('TokenHolderQuantity')} float'a √ßevrilemedi.")
                    continue # Hatalƒ± veriyi atla

            total_supply = 0
            for h in holders:
                 try:
                    total_supply += float(h.get("TokenHolderQuantity", 0))
                 except ValueError:
                     print(f"  _calculate_concentration ValueError: {h.get('TokenHolderQuantity')} float'a √ßevrilemedi.")
                     continue # Hatalƒ± veriyi atla


            if total_supply == 0:
                return 100

            concentration = (top_10_balance / total_supply) * 100
            return min(concentration, 100)  # Max %100
        except Exception as e:
            print(f"  _calculate_concentration genel hata: {e}")
            return 75  # Default deƒüer

    def _get_contract_source(self, address):
        """Kontrat kaynak kodunu al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "contract",
                "action": "getsourcecode",
                "address": address,
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            print(f"  Etherscan API √ßaƒürƒ±sƒ± (getsourcecode): {url}?module=contract&action=getsourcecode&address={address}&apikey=...")


            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            print(f"  Etherscan API yanƒ±tƒ± (getsourcecode status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1" and data.get("result"):
                source_code = data["result"][0].get("SourceCode", "")
                return {
                    "source_code": source_code,
                    "verified": len(source_code) > 0 and source_code != "Contract source code not verified" # Doƒürulama mesajƒ±nƒ± da kontrol et
                }
            else:
                 print(f"  Etherscan API'den kaynak kodu alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                 return {"source_code": "", "verified": False}

        except requests.exceptions.RequestException as e:
            print(f"  _get_contract_source istek hatasƒ±: {e}")
            return {"source_code": "", "verified": False}
        except Exception as e:
            print(f"  _get_contract_source genel hata: {e}")
            return {"source_code": "", "verified": False}


    def _check_mint_function(self, contract_data):
        """Mint fonksiyonu var mƒ± kontrol et"""
        source_code = contract_data.get("source_code", "").lower()
        mint_keywords = ["function mint", "mint(", "_mint(", "mintto", "mint_"]
        return any(keyword in source_code for keyword in mint_keywords)

    def _extract_max_tax(self, contract_data):
        """Maksimum tax oranƒ±nƒ± bulmaya √ßalƒ±≈ü"""
        source_code = contract_data.get("source_code", "").lower()

        # Yaygƒ±n tax variable isimleri
        tax_patterns = ["buytax", "selltax", "tax", "fee"]

        import re
        max_tax = 0
        for pattern in tax_patterns:
            # uint256 public buyTax = 5; gibi patternleri ara
            regex = rf"{pattern}\s*=\s*(\d+)"
            matches = re.findall(regex, source_code)
            for match in matches:
                 try:
                     max_tax = max(max_tax, int(match))
                 except ValueError:
                     print(f"  _extract_max_tax ValueError: {match} int'e √ßevrilemedi.")
                     continue # Hatalƒ± veriyi atla

        return max_tax # Default deƒüer


    def _check_ownership_renounced(self, address):
        """Owner renounce edilmi≈ü mi kontrol et"""
        try:
            # Owner fonksiyonu √ßaƒüƒ±r
            owner_abi = [{
                "constant": True,
                "inputs": [],
                "name": "owner",
                "outputs": [{"name": "", "type": "address"}],
                "type": "function"
            }]

            contract = self.w3.eth.contract(address=address, abi=owner_abi)
            owner = contract.functions.owner().call()

            # 0x000...000 adresine sahipse renounce edilmi≈ü
            return owner == "0x0000000000000000000000000000000000000000"
        except Exception as e:
            print(f"  _check_ownership_renounced hatasƒ±: {e}")
            return False

    def _check_liquidity_lock(self, address):
        """Likidite kilidi kontrol et (basit versiyon)"""
        # Bu ger√ßek implementasyonda Uniswap/PancakeSwap pool kontratlarƒ±nƒ± kontrol etmeli
        # Etherscan API'sinde doƒürudan likidite kilidi bilgisi yok, bu daha karma≈üƒ±k bir analiz gerektirir.
        # ≈ûimdilik sabit deƒüer d√∂nd√ºr√ºyoruz.
        print("  _check_liquidity_lock: Basit kontrol, her zaman False d√∂nd√ºr√ºyor.")
        return {"locked": False, "lock_time": 0}

    def _get_creation_date(self, address):
        """Kontrat olu≈üturulma tarihini al"""
        try:
            url = f"https://api.etherscan.io/api"
            params = {
                "module": "account",
                "action": "txlist",
                "address": address,
                "startblock": 0,
                "endblock": 99999999,
                "page": 1,
                "offset": 1,
                "sort": "asc",
                "apikey": self.etherscan_api
            }
            # requests handles urlencoding parameters automatically
            print(f"  Etherscan API √ßaƒürƒ±sƒ± (txlist): {url}?module=account&action=txlist&address={address}&startblock=0&endblock=99999999&page=1&offset=1&sort=asc&apikey=...")


            response = requests.get(url, params=params, timeout=15) # Timeout artƒ±rƒ±ldƒ±
            response.raise_for_status() # HTTP hatalarƒ±nƒ± yakala
            data = response.json()
            print(f"  Etherscan API yanƒ±tƒ± (txlist status): {data.get('status')}, message: {data.get('message')}")


            if data.get("status") == "1" and data.get("result"):
                timestamp = int(data["result"][0].get("timeStamp", 0))
                return timestamp
            else:
                print(f"  Etherscan API'den olu≈üturulma tarihi alƒ±namadƒ±: {data.get('message', 'Bilinmeyen hata')}")
                return 0
        except requests.exceptions.RequestException as e:
            print(f"  _get_creation_date istek hatasƒ±: {e}")
            return 0
        except Exception as e:
            print(f"  _get_creation_date genel hata: {e}")
            return 0


    def _get_default_features(self):
        """Hata durumunda default deƒüerler"""
        return {
            "holder_concentration": 75,
            "mint_function": True,
            "lp_locked": False,
            "max_tax": 25,
            "owner_renounced": False,
            "contract_verified": False,
            "creation_date": 0,
            "total_supply": 0,
            "holder_count": 0
        }

Overwriting utils/onchain_checker.py


In [None]:
%%writefile utils/social_analysis.py
import requests
import tweepy
from config import Config
from datetime import datetime, timedelta

class SocialAnalyzer:
    def __init__(self):
        if Config.TWITTER_BEARER_TOKEN:
            try:
                self.twitter_client = tweepy.Client(bearer_token=Config.TWITTER_BEARER_TOKEN)
                # Verify credentials - basic check
                # self.twitter_client.get_me() # Bu kotaya dahil olabilir, dikkatli kullanƒ±n
                print("‚úÖ Twitter istemcisi ba≈ülatƒ±ldƒ±.")
            except Exception as e:
                 print(f"‚ùå Twitter istemcisi ba≈ülatƒ±lƒ±rken hata olu≈ütu: {e}")
                 self.twitter_client = None
        else:
            print("‚ö†Ô∏è TWITTER_BEARER_TOKEN ayarlanmamƒ±≈ü. Sosyal medya analizi yapƒ±lamayacak.")
            self.twitter_client = None


    def analyze_social_signals(self, token_name, contract_address):
        """Token i√ßin sosyal medya sinyallerini analiz et"""
        signals = {
            "twitter_mentions": 0,
            "sentiment_score": 0.5,  # 0-1 arasƒ±
            "bot_activity": False,
            "pump_dump_keywords": False
        }

        if self.twitter_client:
            try:
                # Sorgu olu≈üturma - Hata ayƒ±klama i√ßin basƒ±ldƒ±
                # Tweet sorgusu i√ßin kurallar: https://developer.twitter.com/en/docs/twitter-api/tweets/search/integrate/build-a-query
                # Contract adresi yerine sadece token adƒ± veya kƒ±saltmasƒ± daha iyi olabilir
                # √áok kƒ±sa contract adresi par√ßasƒ± ([:10]) ge√ßersiz sorguya neden olabilir
                # Token adƒ± bo≈üsa veya √ßok kƒ±saysa da sorun olabilir.
                query_parts = []
                # Check if token_name is provided and is reasonably long
                if token_name and len(token_name.strip()) > 2: # Use token name if provided and > 2 chars
                    query_parts.append(f'"{token_name.strip()}"')
                # Add contract address only if token_name is not used or is short, and address is valid length
                # Using the full address in query might be too specific, maybe just the beginning?
                # Or perhaps only use contract address if no token name is given?
                # Let's prioritize token name, then fall back to part of address if name is not useful.
                elif contract_address and len(contract_address) > 10: # Check contract address length
                     query_parts.append(f'{contract_address[:10]}') # Use first 10 chars


                if not query_parts:
                     print("‚ö†Ô∏è Twitter sorgusu i√ßin yeterli bilgi (token adƒ± veya kontrat adresi) yok.")
                     return signals # Bo≈ü sinyaller d√∂nd√ºr

                # Combine query parts with OR, ensure no leading/trailing OR if only one part
                query = " OR ".join(query_parts)
                final_query = f"{query} -is:retweet lang:en" # Retweetleri hari√ß tut

                print(f"  Twitter API sorgusu: {final_query}")

                # Son 24 saat i√ßinde token hakkƒ±nda tweetleri ara
                # start_time = datetime.utcnow() - timedelta(hours=24)
                # start_time_str = start_time.isoformat("T") + "Z" # RFC3339 formatƒ±


                tweets = self.twitter_client.search_recent_tweets(
                    query=final_query,
                    max_results=10, # Test i√ßin daha az sonu√ß √ßek
                    # start_time=start_time_str, # Son 24 saat
                    tweet_fields=["created_at", "author_id", "public_metrics", "text"] # text'i de al
                )

                if tweets and tweets.data:
                    print(f"  Twitter API {len(tweets.data)} tweet buldu.")
                    signals["twitter_mentions"] = len(tweets.data)
                    signals["pump_dump_keywords"] = self._check_pump_dump_keywords(tweets.data)
                    signals["bot_activity"] = self._detect_bot_activity(tweets.data)
                    signals["sentiment_score"] = self._calculate_sentiment(tweets.data)
                else:
                    print("  Twitter API tweet bulamadƒ±.")
                    signals["twitter_mentions"] = 0


            except tweepy.errors.TweepyException as e:
                 print(f"‚ùå Twitter API hatasƒ± (TweepyException): {e}")
                 # Hata detaylarƒ±nƒ± yazdƒ±rma
                 if hasattr(e, 'response') and e.response is not None:
                     print(f"  HTTP Status Code: {e.response.status_code}")
                     try:
                         print(f"  Response Body: {e.response.json()}")
                     except:
                         print(f"  Response Body: {e.response.text}")

            except Exception as e:
                print(f"‚ùå Sosyal medya analizi genel hata: {e}")

        else:
            print("‚ö†Ô∏è Twitter istemcisi kullanƒ±lamiyor. Sosyal medya analizi atlandƒ±.")


        return signals

    def _check_pump_dump_keywords(self, tweets):
        """Pump&dump anahtar kelimelerini kontrol et"""
        pump_keywords = [
            "moon", "üöÄ", "pump", "easy money", "quick profit",
            "100x", "1000x", "get rich", "diamond hands", "ape in"
        ]

        # tweets bir liste, her √∂ƒüe bir Tweet nesnesi
        tweet_texts = " ".join([tweet.text.lower() for tweet in tweets])
        return any(keyword in tweet_texts for keyword in pump_keywords)

    def _detect_bot_activity(self, tweets):
        """Bot aktivitesi tespit et"""
        if len(tweets) < 5: # Bot aktivitesi i√ßin minimum tweet sayƒ±sƒ±
            return False

        # Aynƒ± metinlerin tekrarƒ±
        tweet_texts = [tweet.text.strip() for tweet in tweets] # Ba≈üƒ±ndaki ve sonundaki bo≈üluklarƒ± kaldƒ±r
        unique_tweets = set(tweet_texts)

        # %50'den fazlasƒ± aynƒ±ysa bot aktivitesi olabilir (daha d√º≈ü√ºk e≈üik)
        is_bot = len(unique_tweets) / len(tweet_texts) < 0.5
        if is_bot:
            print("  Bot aktivitesi ≈ü√ºphesi: Tekrarlayan tweetler tespit edildi.")
        return is_bot


    def _calculate_sentiment(self, tweets):
        """Basit sentiment analizi"""
        positive_words = ["good", "great", "amazing", "bullish", "buy", "long", "holding", "hodl", "strong"]
        negative_words = ["scam", "rug", "fake", "avoid", "dump", "short", "sell", "warning", "risk"]

        total_score = 0
        if not tweets:
            return 0.5 # Tweet yoksa n√∂tr

        for tweet in tweets:
            text = tweet.text.lower()
            pos_count = sum(word in text for word in positive_words)
            neg_count = sum(word in text for word in negative_words)

            # Sentiment skorunu hesapla (basit oran)
            score = 0.5 # Default n√∂tr
            if pos_count > neg_count:
                score = 1.0 # Pozitif
            elif neg_count > pos_count:
                score = 0.0 # Negatif
            # Eƒüer pos_count == neg_count ise skor 0.5 kalƒ±r (n√∂tr)

            total_score += score

        # Ortalama sentiment
        average_sentiment = total_score / len(tweets)
        return average_sentiment

Overwriting utils/social_analysis.py


In [None]:
%%writefile utils/code_analysis.py
import re

class CodeAnalyzer:
    def __init__(self):
        self.risk_patterns = {
            "mint_function": [
                r"function\s+mint\s*\(",
                r"function\s+_mint\s*\(",
                r"\.mint\s*\(",
                r"mintTo\s*\("
            ],
            "ownership_issues": [
                r"onlyOwner",
                r"_owner\s*=",
                r"transferOwnership",
                r"renounceOwnership"
            ],
            "liquidity_removal": [
                r"removeLiquidity",
                r"withdraw.*Liquidity",
                r"emergencyWithdraw"
            ],
            "tax_functions": [
                r"buyTax\s*=",
                r"sellTax\s*=",
                r"setTax",
                r"updateTax"
            ],
            "blacklist_functions": [
                r"blacklist",
                r"addBot",
                r"removeBot",
                r"setBot"
            ],
            "pause_functions": [
                r"pause\s*\(",
                r"unpause\s*\(",
                r"setPaused"
            ]
        }

    def analyze_contract_code(self, address, source_code=""):
        """Kontrat kodunu analiz ederek risk bayraklarƒ±nƒ± d√∂nd√ºr"""
        flags = []

        if not source_code:
            flags.append("‚ö†Ô∏è Kontrat kodu doƒürulanamadƒ±")
            return flags

        source_code = source_code.lower()

        # Mint fonksiyonu kontrol√º
        if self._check_pattern_exists(source_code, self.risk_patterns["mint_function"]):
            flags.append("üî¥ Mint fonksiyonu tespit edildi")

        # Ownership kontrolleri
        if self._check_pattern_exists(source_code, self.risk_patterns["ownership_issues"]):
            if not self._check_ownership_renounced(source_code):
                flags.append("üî¥ Owner haklarƒ± korunuyor")

        # Likidite √ßekme fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["liquidity_removal"]):
            flags.append("üî¥ Likidite √ßekme fonksiyonu var")

        # Vergi fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["tax_functions"]):
            tax_rate = self._extract_tax_rate(source_code)
            if tax_rate > 10:
                flags.append(f"üî¥ Y√ºksek vergi oranƒ±: %{tax_rate}")

        # Blacklist fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["blacklist_functions"]):
            flags.append("üî¥ Blacklist fonksiyonu tespit edildi")

        # Pause fonksiyonlarƒ±
        if self._check_pattern_exists(source_code, self.risk_patterns["pause_functions"]):
            flags.append("üî¥ Trading durdurma fonksiyonu var")

        # Honeypot kontrolleri
        if self._check_honeypot_patterns(source_code):
            flags.append("üî¥ Honeypot kalƒ±plarƒ± tespit edildi")

        if not flags:
            flags.append("‚úÖ Kod analizi temiz")

        return flags

    def _check_pattern_exists(self, code, patterns):
        """Verilen pattern'lerden herhangi biri var mƒ± kontrol et"""
        for pattern in patterns:
            if re.search(pattern, code, re.IGNORECASE):
                return True
        return False

    def _check_ownership_renounced(self, code):
        """Ownership renounce edilmi≈ü mi kontrol et"""
        renounce_patterns = [
            r"renounceownership\s*\(\s*\)",
            r"_owner\s*=.*0x0+",
            r"owner.*=.*address\(0\)"
        ]
        return self._check_pattern_exists(code, renounce_patterns)

    def _extract_tax_rate(self, code):
        """Kod i√ßinden vergi oranƒ±nƒ± √ßƒ±karma"""
        tax_patterns = [
            r"buytax\s*=\s*(\d+)",
            r"selltax\s*=\s*(\d+)",
            r"tax.*=\s*(\d+)"
        ]

        max_tax = 0
        for pattern in tax_patterns:
            matches = re.findall(pattern, code, re.IGNORECASE)
            for match in matches:
                max_tax = max(max_tax, int(match))

        return max_tax

    def _check_honeypot_patterns(self, code):
        """Honeypot kalƒ±plarƒ±nƒ± kontrol et"""
        honeypot_patterns = [
            r"require.*balances\[.*\]\s*>=",  # Balance kontrol√º
            r"if.*balances\[.*\].*return false",  # Transfer engelleme
            r"mapping.*bool.*isexcluded",  # Exclusion mapping
            r"tradingopen\s*=\s*false"  # Trading kapalƒ±
        ]
        return self._check_pattern_exists(code, honeypot_patterns)

Overwriting utils/code_analysis.py


In [None]:
%%writefile utils/feature_engineering.py
import numpy as np
from datetime import datetime

class FeatureEngineer:
    def build_features(self, onchain_data, social_data=None):
        """Ham verilerden ML modeli i√ßin √∂zellikler olu≈ütur"""

        # Temel √∂zellikler
        features = [
            onchain_data.get("holder_concentration", 75),
            int(onchain_data.get("mint_function", True)),
            int(onchain_data.get("lp_locked", False)),
            onchain_data.get("max_tax", 25),
            int(onchain_data.get("owner_renounced", False)),
            int(onchain_data.get("contract_verified", False))
        ]

        # Kontrat ya≈üƒ± (g√ºn cinsinden)
        creation_date = onchain_data.get("creation_date", 0)
        if creation_date > 0:
            age_days = (datetime.now().timestamp() - creation_date) / 86400
            features.append(min(age_days, 365))  # Max 1 yƒ±l
        else:
            features.append(0)

        # Holder sayƒ±sƒ± (log √∂l√ßeƒüi)
        holder_count = onchain_data.get("holder_count", 1)
        features.append(np.log10(max(holder_count, 1)))

        # Sosyal medya √∂zellikleri
        if social_data:
            features.extend([
                social_data.get("twitter_mentions", 0),
                social_data.get("sentiment_score", 0.5) * 100,  # 0-100 √∂l√ßeƒüi
                int(social_data.get("bot_activity", False)),
                int(social_data.get("pump_dump_keywords", False))
            ])
        else:
            features.extend([0, 50, 0, 0])  # Default deƒüerler

        return np.array(features)

    def get_feature_names(self):
        """√ñzellik isimlerini d√∂nd√ºr"""
        return [
            "holder_concentration",
            "mint_function",
            "lp_locked",
            "max_tax",
            "owner_renounced",
            "contract_verified",
            "age_days",
            "log_holder_count",
            "twitter_mentions",
            "sentiment_score",
            "bot_activity",
            "pump_dump_keywords"
        ]

Overwriting utils/feature_engineering.py


In [None]:
!python main.py

Running on local URL:  http://127.0.0.1:7860
IMPORTANT: You are using gradio version 3.45.0, however version 4.44.1 is available, please upgrade.
--------
Running on public URL: https://42d3af8503b28f836c.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)
‚ùå API anahtarlarƒ± (Infura, Etherscan) ayarlanmamƒ±≈ü! On-chain analiz yapƒ±lamayacak.
‚ö†Ô∏è TWITTER_BEARER_TOKEN ayarlanmamƒ±≈ü. Sosyal medya analizi yapƒ±lamayacak.
üîç Analiz ba≈ülƒ±yor: 0xA0b86991c31CB32C05C6f5f1B0a5b4C2a5D4C0a6

üìä Onchain veriler alƒ±nƒ±yor...
‚ùå API anahtarlarƒ± eksik, on-chain analiz atlandƒ±.
üê¶ Sosyal medya analizi yapƒ±lƒ±yor...
‚ö†Ô∏è Twitter istemcisi kullanƒ±lamiyor. Sosyal medya analizi atlandƒ±.
üíª Kontrat kodu analiz ediliyor...
‚öôÔ∏è √ñzellik m√ºhendisliƒüi yapƒ±lƒ±yor...
Keyboard interruption in main thread... closing server.
Traceback (most recent call last):